Просмотр исходного кода

Merge pull request #40 from ionutbalutoiu/refactor-validation-task

Refactor input params validation and update replica tasks
Nashwan Azhari 6 лет назад
Родитель
Сommit
3f0939a7c0

+ 35 - 12
coriolis/conductor/rpc/server.py

@@ -316,15 +316,25 @@ class ConductorServerEndpoint(object):
         execution.action = replica
 
         for instance in execution.action.instances:
-            validate_replica_inputs_task = self._create_task(
-                instance, constants.TASK_TYPE_VALIDATE_REPLICA_INPUTS,
-                execution)
-
             get_instance_info_task = self._create_task(
                 instance, constants.TASK_TYPE_GET_INSTANCE_INFO,
-                execution, depends_on=[validate_replica_inputs_task.id])
+                execution)
+
+            validate_replica_source_inputs_task = self._create_task(
+                instance,
+                constants.TASK_TYPE_VALIDATE_REPLICA_SOURCE_INPUTS,
+                execution,
+                depends_on=[get_instance_info_task.id])
+
+            validate_replica_destination_inputs_task = self._create_task(
+                instance,
+                constants.TASK_TYPE_VALIDATE_REPLICA_DESTINATION_INPUTS,
+                execution,
+                depends_on=[get_instance_info_task.id])
 
-            depends_on = [get_instance_info_task.id]
+            depends_on = [
+                validate_replica_source_inputs_task.id,
+                validate_replica_destination_inputs_task.id]
             if shutdown_instances:
                 shutdown_instance_task = self._create_task(
                     instance, constants.TASK_TYPE_SHUTDOWN_INSTANCE,
@@ -695,13 +705,20 @@ class ConductorServerEndpoint(object):
             migration, licensing_client.RESERVATION_TYPE_MIGRATION)
 
         for instance in instances:
-            task_validate = self._create_task(
-                instance, constants.TASK_TYPE_VALIDATE_MIGRATION_INPUTS,
+            task_validate_source = self._create_task(
+                instance,
+                constants.TASK_TYPE_VALIDATE_MIGRATION_SOURCE_INPUTS,
                 execution)
 
+            task_validate_destination = self._create_task(
+                instance,
+                constants.TASK_TYPE_VALIDATE_MIGRATION_DESTINATION_INPUTS,
+                execution,
+                depends_on=[task_validate_source.id])
+
             task_export = self._create_task(
                 instance, constants.TASK_TYPE_EXPORT_INSTANCE, execution,
-                depends_on=[task_validate.id])
+                depends_on=[task_validate_destination.id])
 
             if (constants.PROVIDER_TYPE_INSTANCE_FLAVOR in
                     destination_provider_types):
@@ -947,7 +964,9 @@ class ConductorServerEndpoint(object):
                 LOG.debug(
                     "No 'transfer_result' was returned for task type '%s' "
                     "for transfer action '%s'", task_type, execution.action_id)
-        elif task_type == constants.TASK_TYPE_UPDATE_REPLICA:
+        elif task_type in (
+                constants.TASK_TYPE_UPDATE_SOURCE_REPLICA,
+                constants.TASK_TYPE_UPDATE_DESTINATION_REPLICA):
             # NOTE: perform the actual db update on the Replica's properties:
             db_api.update_replica(ctxt, execution.action_id, task_info)
             # NOTE: remember to update the `volumes_info`:
@@ -1118,9 +1137,13 @@ class ConductorServerEndpoint(object):
             inst_info_copy = copy.deepcopy(replica.info[instance])
             inst_info_copy.update(properties)
             replica.info[instance] = inst_info_copy
-            self._create_task(
-                instance, constants.TASK_TYPE_UPDATE_REPLICA,
+            update_source_replica_task = self._create_task(
+                instance, constants.TASK_TYPE_UPDATE_SOURCE_REPLICA,
                 execution)
+            self._create_task(
+                instance, constants.TASK_TYPE_UPDATE_DESTINATION_REPLICA,
+                execution,
+                depends_on=[update_source_replica_task.id])
         LOG.debug(
             "Replica '%s' info post-replica-update: %s",
             replica_id, replica.info)

+ 9 - 3
coriolis/constants.py

@@ -45,11 +45,17 @@ TASK_TYPE_CREATE_REPLICA_DISK_SNAPSHOTS = "CREATE_REPLICA_DISK_SNAPSHOTS"
 TASK_TYPE_DELETE_REPLICA_DISK_SNAPSHOTS = "DELETE_REPLICA_DISK_SNAPSHOTS"
 TASK_TYPE_RESTORE_REPLICA_DISK_SNAPSHOTS = "RESTORE_REPLICA_DISK_SNAPSHOTS"
 TASK_TYPE_GET_OPTIMAL_FLAVOR = "GET_OPTIMAL_FLAVOR"
-TASK_TYPE_VALIDATE_MIGRATION_INPUTS = "VALIDATE_MIGRATION_INPUTS"
-TASK_TYPE_VALIDATE_REPLICA_INPUTS = "VALIDATE_REPLICA_INPUTS"
+TASK_TYPE_VALIDATE_MIGRATION_SOURCE_INPUTS = (
+    "VALIDATE_MIGRATION_SOURCE_INPUTS")
+TASK_TYPE_VALIDATE_MIGRATION_DESTINATION_INPUTS = (
+    "VALIDATE_MIGRATION_DESTINATION_INPUTS")
+TASK_TYPE_VALIDATE_REPLICA_SOURCE_INPUTS = "VALIDATE_REPLICA_SOURCE_INPUTS"
+TASK_TYPE_VALIDATE_REPLICA_DESTINATION_INPUTS = (
+    "VALIDATE_REPLICA_DESTINATION_INPUTS")
 TASK_TYPE_VALIDATE_REPLICA_DEPLOYMENT_INPUTS = (
     "VALIDATE_REPLICA_DEPLOYMENT_INPUTS")
-TASK_TYPE_UPDATE_REPLICA = "UPDATE_REPLICA"
+TASK_TYPE_UPDATE_SOURCE_REPLICA = "UPDATE_SOURCE_REPLICA"
+TASK_TYPE_UPDATE_DESTINATION_REPLICA = "UPDATE_DESTINATION_REPLICA"
 
 PROVIDER_TYPE_IMPORT = 1
 PROVIDER_TYPE_EXPORT = 2

+ 12 - 6
coriolis/tasks/factory.py

@@ -24,8 +24,10 @@ _TASKS_MAP = {
         migration_tasks.CleanupFailedImportInstanceTask,
     constants.TASK_TYPE_GET_OPTIMAL_FLAVOR:
         migration_tasks.GetOptimalFlavorTask,
-    constants.TASK_TYPE_VALIDATE_MIGRATION_INPUTS:
-        migration_tasks.ValidateMigrationParametersTask,
+    constants.TASK_TYPE_VALIDATE_MIGRATION_SOURCE_INPUTS:
+        migration_tasks.ValidateMigrationSourceInputsTask,
+    constants.TASK_TYPE_VALIDATE_MIGRATION_DESTINATION_INPUTS:
+        migration_tasks.ValidateMigrationDestinationInputsTask,
     constants.TASK_TYPE_DEPLOY_OS_MORPHING_RESOURCES:
         osmorphing_tasks.DeployOSMorphingResourcesTask,
     constants.TASK_TYPE_OS_MORPHING:
@@ -62,12 +64,16 @@ _TASKS_MAP = {
         replica_tasks.DeleteReplicaDiskSnapshotsTask,
     constants.TASK_TYPE_RESTORE_REPLICA_DISK_SNAPSHOTS:
         replica_tasks.RestoreReplicaDiskSnapshotsTask,
-    constants.TASK_TYPE_VALIDATE_REPLICA_INPUTS:
-        replica_tasks.ValidateReplicaExecutionParametersTask,
+    constants.TASK_TYPE_VALIDATE_REPLICA_SOURCE_INPUTS:
+        replica_tasks.ValidateReplicaExecutionSourceInputsTask,
+    constants.TASK_TYPE_VALIDATE_REPLICA_DESTINATION_INPUTS:
+        replica_tasks.ValidateReplicaExecutionDestinationInputsTask,
     constants.TASK_TYPE_VALIDATE_REPLICA_DEPLOYMENT_INPUTS:
         replica_tasks.ValidateReplicaDeploymentParametersTask,
-    constants.TASK_TYPE_UPDATE_REPLICA:
-        replica_tasks.UpdateReplicaTask
+    constants.TASK_TYPE_UPDATE_SOURCE_REPLICA:
+        replica_tasks.UpdateSourceReplicaTask,
+    constants.TASK_TYPE_UPDATE_DESTINATION_REPLICA:
+        replica_tasks.UpdateDestinationReplicaTask
 }
 
 

+ 29 - 17
coriolis/tasks/migration_tasks.py

@@ -192,28 +192,25 @@ class GetOptimalFlavorTask(base.TaskRunner):
         return task_info
 
 
-class ValidateMigrationParametersTask(base.TaskRunner):
+class ValidateMigrationSourceInputsTask(base.TaskRunner):
     def run(self, ctxt, instance, origin, destination, task_info,
             event_handler):
         event_manager = events.EventManager(event_handler)
-        # validate source params:
-        origin_type = origin["type"]
         origin_connection_info = base.get_connection_info(ctxt, origin)
-        destination_connection_info = base.get_connection_info(
-            ctxt, destination)
-        destination_type = destination["type"]
+        origin_type = origin["type"]
+
         source_provider = providers_factory.get_provider(
             origin_type, constants.PROVIDER_TYPE_VALIDATE_MIGRATION_EXPORT,
             event_handler, raise_if_not_found=False)
         export_info = None
         if source_provider:
             export_info = source_provider.validate_migration_export_input(
-                ctxt, base.get_connection_info(ctxt, origin), instance,
+                ctxt, origin_connection_info, instance,
                 source_environment=origin.get("source_environment", {}))
         else:
             event_manager.progress_update(
-                "Migration Export Provider for platform '%s' does not support "
-                "Migration input validation" % origin_type)
+                "Migration Export Provider for platform '%s' does not "
+                "support Migration input validation" % origin_type)
 
         if export_info is None:
             source_endpoint_provider = providers_factory.get_provider(
@@ -222,9 +219,7 @@ class ValidateMigrationParametersTask(base.TaskRunner):
             if not source_endpoint_provider:
                 event_manager.progress_update(
                     "Migration Export Provider for platform '%s' does not "
-                    "support querying instance export info. Cannot perform "
-                    "Migration Import validation for destination platform "
-                    "'%s'" % (origin_type, destination_type))
+                    "support querying instance export info" % origin_type)
                 return task_info
             export_info = source_endpoint_provider.get_instance(
                 ctxt, origin_connection_info, instance)
@@ -232,25 +227,42 @@ class ValidateMigrationParametersTask(base.TaskRunner):
         # validate Export info:
         schemas.validate_value(
             export_info, schemas.CORIOLIS_VM_EXPORT_INFO_SCHEMA)
-        # NOTE: this export info will get overriden with updated values
+        # NOTE: this export info will get overridden with updated values
         # and disk paths after the ExportInstanceTask.
         task_info["export_info"] = export_info
 
-        # validate destination params:
+        return task_info
+
+
+class ValidateMigrationDestinationInputsTask(base.TaskRunner):
+    def run(self, ctxt, instance, origin, destination, task_info,
+            event_handler):
+        event_manager = events.EventManager(event_handler)
+        destination_type = destination["type"]
+        if task_info.get("export_info") is None:
+            event_manager.progress_update(
+                "Instance export info is not set. Cannot perform Migration "
+                "Import validation for destination platform "
+                "'%s'" % destination_type)
+            return task_info
+
+        destination_connection_info = base.get_connection_info(
+            ctxt, destination)
         destination_provider = providers_factory.get_provider(
             destination_type,
             constants.PROVIDER_TYPE_VALIDATE_MIGRATION_IMPORT, event_handler,
             raise_if_not_found=False)
         if not destination_provider:
             event_manager.progress_update(
-                "Migration Import Provider for platform '%s' does not support "
-                "Migration input validation" % destination_type)
+                "Migration Import Provider for platform '%s' does not "
+                "support Migration input validation" % destination_type)
             return task_info
 
         # NOTE: the target environment JSON schema should have been validated
         # upon accepting the Migration API creation request.
         target_environment = destination.get("target_environment", {})
         destination_provider.validate_migration_import_input(
-            ctxt, destination_connection_info, target_environment, export_info)
+            ctxt, destination_connection_info, target_environment,
+            task_info["export_info"])
 
         return task_info

+ 88 - 84
coriolis/tasks/replica_tasks.py

@@ -339,51 +339,35 @@ class RestoreReplicaDiskSnapshotsTask(base.TaskRunner):
         return task_info
 
 
-class ValidateReplicaExecutionParametersTask(base.TaskRunner):
+class ValidateReplicaExecutionSourceInputsTask(base.TaskRunner):
     def run(self, ctxt, instance, origin, destination, task_info,
             event_handler):
         event_manager = events.EventManager(event_handler)
-        # validate source params:
         origin_type = origin["type"]
-        origin_connection_info = base.get_connection_info(ctxt, origin)
-        destination_connection_info = base.get_connection_info(
-            ctxt, destination)
-        destination_type = destination["type"]
         source_provider = providers_factory.get_provider(
             origin_type, constants.PROVIDER_TYPE_VALIDATE_REPLICA_EXPORT,
             event_handler, raise_if_not_found=False)
-        export_info = None
-        if source_provider:
-            export_info = source_provider.validate_replica_export_input(
-                ctxt, base.get_connection_info(ctxt, origin), instance,
-                source_environment=origin.get("source_environment", {}))
-        else:
+        origin_connection_info = base.get_connection_info(ctxt, origin)
+        if not source_provider:
             event_manager.progress_update(
                 "Replica Export Provider for platform '%s' does not support "
                 "Replica input validation" % origin_type)
+        else:
+            source_provider.validate_replica_export_input(
+                ctxt, origin_connection_info, instance,
+                source_environment=origin.get("source_environment", {}))
 
-        if export_info is None:
-            source_endpoint_provider = providers_factory.get_provider(
-                origin_type, constants.PROVIDER_TYPE_ENDPOINT_INSTANCES,
-                event_handler, raise_if_not_found=False)
-            if not source_endpoint_provider:
-                event_manager.progress_update(
-                    "Replica Export Provider for platform '%s' does not "
-                    "support querying instance export info. Cannot perform "
-                    "Replica Import validation for destination platform "
-                    "'%s'" % (origin_type, destination_type))
-                return task_info
-            export_info = source_endpoint_provider.get_instance(
-                ctxt, origin_connection_info, instance)
+        return task_info
 
-        # validate Export info:
-        schemas.validate_value(
-            export_info, schemas.CORIOLIS_VM_EXPORT_INFO_SCHEMA)
-        # NOTE: this export info will get overriden with updated values
-        # and disk paths after the ExportInstanceTask.
-        task_info["export_info"] = export_info
 
-        # validate destination params:
+class ValidateReplicaExecutionDestinationInputsTask(base.TaskRunner):
+    def run(self, ctxt, instance, origin, destination, task_info,
+            event_handler):
+        event_manager = events.EventManager(event_handler)
+        destination_type = destination["type"]
+
+        destination_connection_info = base.get_connection_info(
+            ctxt, destination)
         destination_provider = providers_factory.get_provider(
             destination_type,
             constants.PROVIDER_TYPE_VALIDATE_REPLICA_IMPORT, event_handler,
@@ -394,11 +378,19 @@ class ValidateReplicaExecutionParametersTask(base.TaskRunner):
                 "Replica input validation" % destination_type)
             return task_info
 
+        export_info = task_info.get("export_info")
+        if not export_info:
+            raise exception.CoriolisException(
+                "Instance export info is not set. Cannot perform "
+                "Replica Import validation for destination platform "
+                "'%s'" % destination_type)
+
         # NOTE: the target environment JSON schema should have been validated
         # upon accepting the Replica API creation request.
         target_environment = destination.get("target_environment", {})
         destination_provider.validate_replica_import_input(
-            ctxt, destination_connection_info, target_environment, export_info)
+            ctxt, destination_connection_info, target_environment,
+            export_info)
 
         return task_info
 
@@ -436,62 +428,74 @@ class ValidateReplicaDeploymentParametersTask(base.TaskRunner):
         return task_info
 
 
-class UpdateReplicaTask(base.TaskRunner):
+class UpdateSourceReplicaTask(base.TaskRunner):
     def run(self, ctxt, instance, origin, destination, task_info,
             event_handler):
-        destination_provider = None
-        source_provider = None
-        new_source_environment = task_info.get('source_environment')
-        new_destination_environment = task_info.get('destination_environment')
-
-        if new_source_environment:
-            source_provider = providers_factory.get_provider(
-                origin["type"], constants.PROVIDER_TYPE_REPLICA_UPDATE,
-                event_handler, raise_if_not_found=False)
-            if not source_provider:
-                raise exception.CoriolisException(
-                    "Replica source provider plugin for '%s' does not support"
-                    " updating Replicas." % origin["type"])
-
-        if new_destination_environment:
-            destination_provider = providers_factory.get_provider(
-                destination["type"], constants.PROVIDER_TYPE_REPLICA_UPDATE,
-                event_handler, raise_if_not_found=False)
-            if not destination_provider:
-                raise exception.CoriolisException(
-                    "Replica destination provider plugin for '%s' does not "
-                    "support updating Replicas." % destination["type"])
+        event_manager = events.EventManager(event_handler)
+        new_source_env = task_info.get('source_environment', {})
+        if not new_source_env:
+            event_manager.progress_update(
+                "No new source environment options provided")
+            return task_info
 
-        connection_info = base.get_connection_info(ctxt, destination)
+        source_provider = providers_factory.get_provider(
+            origin["type"], constants.PROVIDER_TYPE_REPLICA_UPDATE,
+            event_handler, raise_if_not_found=False)
+        if not source_provider:
+            raise exception.CoriolisException(
+                "Replica source provider plugin for '%s' does not support"
+                " updating Replicas" % origin["type"])
+
+        origin_connection_info = base.get_connection_info(ctxt, origin)
+        export_info = task_info.get("export_info", {})
+        volumes_info = task_info.get("volumes_info", {})
+
+        LOG.info("Checking source provider environment params")
+        # NOTE: the `source_environment` in the `origin` is the one set
+        # in the dedicated DB column of the Replica and thus stores
+        # the previous value of it:
+        old_source_env = origin.get('source_environment', {})
+        volumes_info = source_provider.check_update_environment_params(
+            ctxt, origin_connection_info, export_info, volumes_info,
+            old_source_env, new_source_env)
+
+        task_info['volumes_info'] = volumes_info
+
+        return task_info
+
+
+class UpdateDestinationReplicaTask(base.TaskRunner):
+    def run(self, ctxt, instance, origin, destination, task_info,
+            event_handler):
+        event_manager = events.EventManager(event_handler)
+        new_destination_env = task_info.get('destination_environment', {})
+        if not new_destination_env:
+            event_manager.progress_update(
+                "No new destination environment options provided")
+            return task_info
+
+        destination_provider = providers_factory.get_provider(
+            destination["type"], constants.PROVIDER_TYPE_REPLICA_UPDATE,
+            event_handler, raise_if_not_found=False)
+        if not destination_provider:
+            raise exception.CoriolisException(
+                "Replica destination provider plugin for '%s' does not "
+                "support updating Replicas" % destination["type"])
+
+        destination_connection_info = base.get_connection_info(
+            ctxt, destination)
         export_info = task_info.get("export_info", {})
         volumes_info = task_info.get("volumes_info", {})
 
-        if source_provider:
-            LOG.info("Checking source provider environment params")
-            # NOTE: the `source_environment` in the `origin` is the one set
-            # in the dedicated DB column of the Replica and thus stores
-            # the previous value of it:
-            old_source_environment = origin.get('source_environment', {})
-            new_source_environment = task_info.get('source_environment', {})
-            source_provider.check_update_environment_params(
-                ctxt, connection_info, export_info, volumes_info,
-                old_source_environment, new_source_environment)
-
-        if destination_provider:
-            LOG.info("Checking destination provider environment params")
-            # NOTE: the `target_environment` in the `destination` is the one
-            # set in the dedicated DB column of the Replica and thus stores
-            # the previous value of it:
-            old_destination_environment = destination.get(
-                'target_environment', {})
-            new_destination_environment = task_info.get(
-                'destination_environment', {})
-
-            volumes_info = (
-                destination_provider.check_update_environment_params(
-                    ctxt, connection_info, export_info, volumes_info,
-                    old_destination_environment, new_destination_environment))
-
-            task_info['volumes_info'] = volumes_info
+        LOG.info("Checking destination provider environment params")
+        # NOTE: the `target_environment` in the `destination` is the one
+        # set in the dedicated DB column of the Replica and thus stores
+        # the previous value of it:
+        old_destination_env = destination.get('target_environment', {})
+        volumes_info = destination_provider.check_update_environment_params(
+            ctxt, destination_connection_info, export_info, volumes_info,
+            old_destination_env, new_destination_env)
+
+        task_info['volumes_info'] = volumes_info
 
         return task_info