Просмотр исходного кода

Split update replica task

In order to have atomic tasks, the update replica task is
split into two separate tasks for source and destination providers.
Ionut Balutoiu 6 лет назад
Родитель
Сommit
d0c35e0a07

+ 9 - 3
coriolis/conductor/rpc/server.py

@@ -964,7 +964,9 @@ class ConductorServerEndpoint(object):
                 LOG.debug(
                     "No 'transfer_result' was returned for task type '%s' "
                     "for transfer action '%s'", task_type, execution.action_id)
-        elif task_type == constants.TASK_TYPE_UPDATE_REPLICA:
+        elif task_type in (
+                constants.TASK_TYPE_UPDATE_SOURCE_REPLICA,
+                constants.TASK_TYPE_UPDATE_DESTINATION_REPLICA):
             # NOTE: perform the actual db update on the Replica's properties:
             db_api.update_replica(ctxt, execution.action_id, task_info)
             # NOTE: remember to update the `volumes_info`:
@@ -1135,9 +1137,13 @@ class ConductorServerEndpoint(object):
             inst_info_copy = copy.deepcopy(replica.info[instance])
             inst_info_copy.update(properties)
             replica.info[instance] = inst_info_copy
-            self._create_task(
-                instance, constants.TASK_TYPE_UPDATE_REPLICA,
+            update_source_replica_task = self._create_task(
+                instance, constants.TASK_TYPE_UPDATE_SOURCE_REPLICA,
                 execution)
+            self._create_task(
+                instance, constants.TASK_TYPE_UPDATE_DESTINATION_REPLICA,
+                execution,
+                depends_on=[update_source_replica_task.id])
         LOG.debug(
             "Replica '%s' info post-replica-update: %s",
             replica_id, replica.info)

+ 2 - 1
coriolis/constants.py

@@ -54,7 +54,8 @@ TASK_TYPE_VALIDATE_REPLICA_DESTINATION_INPUTS = (
     "VALIDATE_REPLICA_DESTINATION_INPUTS")
 TASK_TYPE_VALIDATE_REPLICA_DEPLOYMENT_INPUTS = (
     "VALIDATE_REPLICA_DEPLOYMENT_INPUTS")
-TASK_TYPE_UPDATE_REPLICA = "UPDATE_REPLICA"
+TASK_TYPE_UPDATE_SOURCE_REPLICA = "UPDATE_SOURCE_REPLICA"
+TASK_TYPE_UPDATE_DESTINATION_REPLICA = "UPDATE_DESTINATION_REPLICA"
 
 PROVIDER_TYPE_IMPORT = 1
 PROVIDER_TYPE_EXPORT = 2

+ 4 - 2
coriolis/tasks/factory.py

@@ -70,8 +70,10 @@ _TASKS_MAP = {
         replica_tasks.ValidateReplicaExecutionDestinationInputsTask,
     constants.TASK_TYPE_VALIDATE_REPLICA_DEPLOYMENT_INPUTS:
         replica_tasks.ValidateReplicaDeploymentParametersTask,
-    constants.TASK_TYPE_UPDATE_REPLICA:
-        replica_tasks.UpdateReplicaTask
+    constants.TASK_TYPE_UPDATE_SOURCE_REPLICA:
+        replica_tasks.UpdateSourceReplicaTask,
+    constants.TASK_TYPE_UPDATE_DESTINATION_REPLICA:
+        replica_tasks.UpdateDestinationReplicaTask
 }
 
 

+ 60 - 51
coriolis/tasks/replica_tasks.py

@@ -428,65 +428,74 @@ class ValidateReplicaDeploymentParametersTask(base.TaskRunner):
         return task_info
 
 
-class UpdateReplicaTask(base.TaskRunner):
+class UpdateSourceReplicaTask(base.TaskRunner):
     def run(self, ctxt, instance, origin, destination, task_info,
             event_handler):
-        destination_provider = None
-        source_provider = None
-        new_source_environment = task_info.get('source_environment')
-        new_destination_environment = task_info.get('destination_environment')
-
-        if new_source_environment:
-            source_provider = providers_factory.get_provider(
-                origin["type"], constants.PROVIDER_TYPE_REPLICA_UPDATE,
-                event_handler, raise_if_not_found=False)
-            if not source_provider:
-                raise exception.CoriolisException(
-                    "Replica source provider plugin for '%s' does not support"
-                    " updating Replicas." % origin["type"])
-
-        if new_destination_environment:
-            destination_provider = providers_factory.get_provider(
-                destination["type"], constants.PROVIDER_TYPE_REPLICA_UPDATE,
-                event_handler, raise_if_not_found=False)
-            if not destination_provider:
-                raise exception.CoriolisException(
-                    "Replica destination provider plugin for '%s' does not "
-                    "support updating Replicas." % destination["type"])
+        event_manager = events.EventManager(event_handler)
+        new_source_env = task_info.get('source_environment', {})
+        if not new_source_env:
+            event_manager.progress_update(
+                "No new source environment options provided")
+            return task_info
+
+        source_provider = providers_factory.get_provider(
+            origin["type"], constants.PROVIDER_TYPE_REPLICA_UPDATE,
+            event_handler, raise_if_not_found=False)
+        if not source_provider:
+            raise exception.CoriolisException(
+                "Replica source provider plugin for '%s' does not support"
+                " updating Replicas" % origin["type"])
 
         origin_connection_info = base.get_connection_info(ctxt, origin)
+        export_info = task_info.get("export_info", {})
+        volumes_info = task_info.get("volumes_info", {})
+
+        LOG.info("Checking source provider environment params")
+        # NOTE: the `source_environment` in the `origin` is the one set
+        # in the dedicated DB column of the Replica and thus stores
+        # the previous value of it:
+        old_source_env = origin.get('source_environment', {})
+        volumes_info = source_provider.check_update_environment_params(
+            ctxt, origin_connection_info, export_info, volumes_info,
+            old_source_env, new_source_env)
+
+        task_info['volumes_info'] = volumes_info
+
+        return task_info
+
+
+class UpdateDestinationReplicaTask(base.TaskRunner):
+    def run(self, ctxt, instance, origin, destination, task_info,
+            event_handler):
+        event_manager = events.EventManager(event_handler)
+        new_destination_env = task_info.get('destination_environment', {})
+        if not new_destination_env:
+            event_manager.progress_update(
+                "No new destination environment options provided")
+            return task_info
+
+        destination_provider = providers_factory.get_provider(
+            destination["type"], constants.PROVIDER_TYPE_REPLICA_UPDATE,
+            event_handler, raise_if_not_found=False)
+        if not destination_provider:
+            raise exception.CoriolisException(
+                "Replica destination provider plugin for '%s' does not "
+                "support updating Replicas" % destination["type"])
+
         destination_connection_info = base.get_connection_info(
             ctxt, destination)
         export_info = task_info.get("export_info", {})
         volumes_info = task_info.get("volumes_info", {})
 
-        if source_provider:
-            LOG.info("Checking source provider environment params")
-            # NOTE: the `source_environment` in the `origin` is the one set
-            # in the dedicated DB column of the Replica and thus stores
-            # the previous value of it:
-            old_source_environment = origin.get('source_environment', {})
-            new_source_environment = task_info.get('source_environment', {})
-            source_provider.check_update_environment_params(
-                ctxt, origin_connection_info, export_info, volumes_info,
-                old_source_environment, new_source_environment)
-
-        if destination_provider:
-            LOG.info("Checking destination provider environment params")
-            # NOTE: the `target_environment` in the `destination` is the one
-            # set in the dedicated DB column of the Replica and thus stores
-            # the previous value of it:
-            old_destination_environment = destination.get(
-                'target_environment', {})
-            new_destination_environment = task_info.get(
-                'destination_environment', {})
-
-            volumes_info = (
-                destination_provider.check_update_environment_params(
-                    ctxt, destination_connection_info, export_info,
-                    volumes_info, old_destination_environment,
-                    new_destination_environment))
-
-            task_info['volumes_info'] = volumes_info
+        LOG.info("Checking destination provider environment params")
+        # NOTE: the `target_environment` in the `destination` is the one
+        # set in the dedicated DB column of the Replica and thus stores
+        # the previous value of it:
+        old_destination_env = destination.get('target_environment', {})
+        volumes_info = destination_provider.check_update_environment_params(
+            ctxt, destination_connection_info, export_info, volumes_info,
+            old_destination_env, new_destination_env)
+
+        task_info['volumes_info'] = volumes_info
 
         return task_info