Jelajahi Sumber

Redefines Migration Tasks for Replica-backed-Migrations.

This patch will redefine most of the migration tasks to inherit
from their replica tasks counterparts.
Daniel Vincze 6 tahun lalu
induk
melakukan
c23137a333

+ 36 - 36
coriolis/conductor/rpc/server.py

@@ -732,34 +732,34 @@ class ConductorServerEndpoint(object):
                 instance, constants.TASK_TYPE_GET_INSTANCE_INFO,
                 execution)
 
-            validate_replica_source_inputs_task = self._create_task(
+            validate_migration_source_inputs_task = self._create_task(
                 instance,
-                constants.TASK_TYPE_VALIDATE_REPLICA_SOURCE_INPUTS,
+                constants.TASK_TYPE_VALIDATE_MIGRATION_SOURCE_INPUTS,
                 execution)
 
-            validate_replica_destination_inputs_task = self._create_task(
+            validate_migration_destination_inputs_task = self._create_task(
                 instance,
-                constants.TASK_TYPE_VALIDATE_REPLICA_DESTINATION_INPUTS,
+                constants.TASK_TYPE_VALIDATE_MIGRATION_DESTINATION_INPUTS,
                 execution,
                 depends_on=[get_instance_info_task.id])
 
             depends_on = [
-                validate_replica_source_inputs_task.id,
-                validate_replica_destination_inputs_task.id]
+                validate_migration_source_inputs_task.id,
+                validate_migration_destination_inputs_task.id]
 
-            deploy_replica_disks_task = self._create_task(
-                instance, constants.TASK_TYPE_DEPLOY_REPLICA_DISKS,
+            create_instance_disks_task = self._create_task(
+                instance, constants.TASK_TYPE_CREATE_INSTANCE_DISKS,
                 execution, depends_on=depends_on)
 
-            deploy_replica_source_resources_task = self._create_task(
+            deploy_migration_source_resources_task = self._create_task(
                 instance,
-                constants.TASK_TYPE_DEPLOY_REPLICA_SOURCE_RESOURCES,
-                execution, depends_on=[deploy_replica_disks_task.id])
+                constants.TASK_TYPE_DEPLOY_MIGRATION_SOURCE_RESOURCES,
+                execution, depends_on=[create_instance_disks_task.id])
 
-            deploy_replica_target_resources_task = self._create_task(
+            deploy_migration_target_resources_task = self._create_task(
                 instance,
-                constants.TASK_TYPE_DEPLOY_REPLICA_TARGET_RESOURCES,
-                execution, depends_on=[deploy_replica_disks_task.id])
+                constants.TASK_TYPE_DEPLOY_MIGRATION_TARGET_RESOURCES,
+                execution, depends_on=[create_instance_disks_task.id])
 
             # NOTE(aznashwan): re-executing the REPLICATE_DISKS task only works
             # if all the source disk snapshotting and worker setup steps are
@@ -768,48 +768,48 @@ class ConductorServerEndpoint(object):
             # Alternatively, if the DEPLOY_REPLICA_SOURCE/DEST_RESOURCES tasks
             # will no longer have a state conflict, iterating through and
             # re-executing DEPLOY_REPLICA_SOURCE_RESOURCES will be required:
-            last_replica_task = None
-            replica_resources_tasks = [
-                deploy_replica_source_resources_task.id,
-                deploy_replica_target_resources_task.id]
+            last_migration_task = None
+            migration_resources_tasks = [
+                deploy_migration_source_resources_task.id,
+                deploy_migration_target_resources_task.id]
             for i in range(migration.replication_count):
                 # insert SHUTDOWN_INSTANCES task before the last sync:
                 if i == (migration.replication_count - 1) and (
                         migration.shutdown_instances):
-                    shutdown_deps = replica_resources_tasks
-                    if last_replica_task:
-                        shutdown_deps = [last_replica_task.id]
-                    last_replica_task = self._create_task(
+                    shutdown_deps = migration_resources_tasks
+                    if last_migration_task:
+                        shutdown_deps = [last_migration_task.id]
+                    last_migration_task = self._create_task(
                         instance, constants.TASK_TYPE_SHUTDOWN_INSTANCE,
                         execution, depends_on=shutdown_deps)
 
-                replication_deps = replica_resources_tasks
-                if last_replica_task:
-                    replication_deps = [last_replica_task.id]
+                replication_deps = migration_resources_tasks
+                if last_migration_task:
+                    replication_deps = [last_migration_task.id]
 
-                last_replica_task = self._create_task(
+                last_migration_task = self._create_task(
                     instance, constants.TASK_TYPE_REPLICATE_DISKS,
                     execution, depends_on=replication_deps)
 
             delete_source_resources_task = self._create_task(
                 instance,
-                constants.TASK_TYPE_DELETE_REPLICA_SOURCE_RESOURCES,
-                execution, depends_on=[last_replica_task.id],
+                constants.TASK_TYPE_DELETE_MIGRATION_SOURCE_RESOURCES,
+                execution, depends_on=[last_migration_task.id],
                 on_error=True)
 
             delete_destination_resources_task = self._create_task(
                 instance,
-                constants.TASK_TYPE_DELETE_REPLICA_TARGET_RESOURCES,
-                execution, depends_on=[last_replica_task.id],
+                constants.TASK_TYPE_DELETE_MIGRATION_TARGET_RESOURCES,
+                execution, depends_on=[last_migration_task.id],
                 on_error=True)
 
-            deploy_replica_task = self._create_task(
-                instance, constants.TASK_TYPE_DEPLOY_REPLICA_INSTANCE,
+            deploy_instance_resources_task = self._create_task(
+                instance, constants.TASK_TYPE_DEPLOY_INSTANCE_RESOURCES,
                 execution, depends_on=[
                     delete_source_resources_task.id,
                     delete_destination_resources_task.id])
 
-            last_task = deploy_replica_task
+            last_task = deploy_instance_resources_task
             if not skip_os_morphing:
                 task_deploy_os_morphing_resources = self._create_task(
                     instance, constants.TASK_TYPE_DEPLOY_OS_MORPHING_RESOURCES,
@@ -836,16 +836,16 @@ class ConductorServerEndpoint(object):
 
             self._create_task(
                 instance,
-                constants.TASK_TYPE_FINALIZE_REPLICA_INSTANCE_DEPLOYMENT,
+                constants.TASK_TYPE_FINALIZE_INSTANCE_DEPLOYMENT,
                 execution, depends_on=[last_task.id])
 
             self._create_task(
                 instance,
-                constants.TASK_TYPE_CLEANUP_FAILED_REPLICA_INSTANCE_DEPLOYMENT,
+                constants.TASK_TYPE_CLEANUP_FAILED_INSTANCE_DEPLOYMENT,
                 execution, on_error=True)
 
             self._create_task(
-                instance, constants.TASK_TYPE_DELETE_REPLICA_DISKS,
+                instance, constants.TASK_TYPE_CLEANUP_INSTANCE_STORAGE,
                 execution, on_error=True)
 
         db_api.add_migration(ctxt, migration)

+ 15 - 0
coriolis/constants.py

@@ -22,6 +22,21 @@ TASK_TYPE_DEPLOY_DISK_COPY_RESOURCES = "DEPLOY_DISK_COPY_RESOURCES"
 TASK_TYPE_COPY_DISK_DATA = "COPY_DISK_DATA"
 TASK_TYPE_DELETE_DISK_COPY_RESOURCES = "DELETE_DISK_COPY_RESOURCES"
 
+TASK_TYPE_DEPLOY_MIGRATION_SOURCE_RESOURCES = (
+    "DEPLOY_MIGRATION_SOURCE_RESOURCES")
+TASK_TYPE_DEPLOY_MIGRATION_TARGET_RESOURCES = (
+    "DEPLOY_MIGRATION_TARGET_RESOURCES")
+TASK_TYPE_DELETE_MIGRATION_SOURCE_RESOURCES = (
+    "DELETE_MIGRATION_SOURCE_RESOURCES")
+TASK_TYPE_DELETE_MIGRATION_TARGET_RESOURCES = (
+    "DELETE_MIGRATION_TARGET_RESOURCES")
+TASK_TYPE_DEPLOY_INSTANCE_RESOURCES = "DEPLOY_INSTANCE_RESOURCES"
+TASK_TYPE_FINALIZE_INSTANCE_DEPLOYMENT = "FINALIZE_INSTANCE_DEPLOYMENT"
+TASK_TYPE_CLEANUP_FAILED_INSTANCE_DEPLOYMENT = (
+    "CLEANUP_FAILED_INSTANCE_DEPLOYMENT")
+TASK_TYPE_CLEANUP_INSTANCE_STORAGE = "CLEANUP_INSTANCE_STORAGE"
+
+TASK_TYPE_CREATE_INSTANCE_DISKS = "CREATE_INSTANCE_DISKS"
 
 TASK_TYPE_DEPLOY_OS_MORPHING_RESOURCES = "DEPLOY_OS_MORPHING_RESOURCES"
 TASK_TYPE_OS_MORPHING = "OS_MORPHING"

+ 18 - 0
coriolis/tasks/factory.py

@@ -22,6 +22,24 @@ _TASKS_MAP = {
         migration_tasks.DeleteDiskCopyResources,
     constants.TASK_TYPE_CLEANUP_FAILED_IMPORT_INSTANCE:
         migration_tasks.CleanupFailedImportInstanceTask,
+    constants.TASK_TYPE_DEPLOY_MIGRATION_SOURCE_RESOURCES:
+        migration_tasks.DeployMigrationSourceResourcesTask,
+    constants.TASK_TYPE_DEPLOY_MIGRATION_TARGET_RESOURCES:
+        migration_tasks.DeployMigrationTargetResourcesTask,
+    constants.TASK_TYPE_DELETE_MIGRATION_SOURCE_RESOURCES:
+        migration_tasks.DeleteMigrationSourceResourcesTask,
+    constants.TASK_TYPE_DELETE_MIGRATION_TARGET_RESOURCES:
+        migration_tasks.DeleteMigrationTargetResourcesTask,
+    constants.TASK_TYPE_DEPLOY_INSTANCE_RESOURCES:
+        migration_tasks.DeployInstanceResourcesTask,
+    constants.TASK_TYPE_FINALIZE_INSTANCE_DEPLOYMENT:
+        migration_tasks.FinalizeInstanceDeploymentTask,
+    constants.TASK_TYPE_CREATE_INSTANCE_DISKS:
+        migration_tasks.CreateInstanceDisksTask,
+    constants.TASK_TYPE_CLEANUP_FAILED_INSTANCE_DEPLOYMENT:
+        migration_tasks.CleanupFailedInstanceDeploymentTask,
+    constants.TASK_TYPE_CLEANUP_INSTANCE_STORAGE:
+        migration_tasks.CleanupInstanceStorageTask,
     constants.TASK_TYPE_GET_OPTIMAL_FLAVOR:
         migration_tasks.GetOptimalFlavorTask,
     constants.TASK_TYPE_VALIDATE_MIGRATION_SOURCE_INPUTS:

+ 123 - 70
coriolis/tasks/migration_tasks.py

@@ -10,6 +10,7 @@ from coriolis.migrations import manager
 from coriolis.providers import factory as providers_factory
 from coriolis import schemas
 from coriolis.tasks import base
+from coriolis.tasks import replica_tasks
 
 LOG = logging.getLogger(__name__)
 
@@ -212,77 +213,129 @@ class GetOptimalFlavorTask(base.TaskRunner):
         return task_info
 
 
-class ValidateMigrationSourceInputsTask(base.TaskRunner):
-    def run(self, ctxt, instance, origin, destination, task_info,
-            event_handler):
-        event_manager = events.EventManager(event_handler)
-        origin_connection_info = base.get_connection_info(ctxt, origin)
-        origin_type = origin["type"]
-
-        source_provider = providers_factory.get_provider(
-            origin_type, constants.PROVIDER_TYPE_VALIDATE_MIGRATION_EXPORT,
-            event_handler, raise_if_not_found=False)
-        source_environment = origin.get("source_environment", {})
-        export_info = None
-        if source_provider:
-            export_info = source_provider.validate_migration_export_input(
-                ctxt, origin_connection_info, instance, source_environment)
-        else:
-            event_manager.progress_update(
-                "Migration Export Provider for platform '%s' does not "
-                "support Migration input validation" % origin_type)
-
-        if export_info is None:
-            source_endpoint_provider = providers_factory.get_provider(
-                origin_type, constants.PROVIDER_TYPE_ENDPOINT_INSTANCES,
-                event_handler, raise_if_not_found=False)
-            if not source_endpoint_provider:
-                event_manager.progress_update(
-                    "Migration Export Provider for platform '%s' does not "
-                    "support querying instance export info" % origin_type)
-                return task_info
-            export_info = source_endpoint_provider.get_instance(
-                ctxt, origin_connection_info, source_environment, instance)
-
-        # validate Export info:
-        schemas.validate_value(
-            export_info, schemas.CORIOLIS_VM_EXPORT_INFO_SCHEMA)
-        # NOTE: this export info will get overridden with updated values
-        # and disk paths after the ExportInstanceTask.
-        task_info["export_info"] = export_info
+# class ValidateMigrationSourceInputsTask(base.TaskRunner):
+    # def run(self, ctxt, instance, origin, destination, task_info,
+    #         event_handler):
+    #     event_manager = events.EventManager(event_handler)
+    #     origin_connection_info = base.get_connection_info(ctxt, origin)
+    #     origin_type = origin["type"]
+
+    #     source_provider = providers_factory.get_provider(
+    #         origin_type, constants.PROVIDER_TYPE_VALIDATE_MIGRATION_EXPORT,
+    #         event_handler, raise_if_not_found=False)
+    #     source_environment = origin.get("source_environment", {})
+    #     export_info = None
+    #     if source_provider:
+    #         export_info = source_provider.validate_migration_export_input(
+    #             ctxt, origin_connection_info, instance, source_environment)
+    #     else:
+    #         event_manager.progress_update(
+    #             "Migration Export Provider for platform '%s' does not "
+    #             "support Migration input validation" % origin_type)
 
-        return task_info
+    #     if export_info is None:
+    #         source_endpoint_provider = providers_factory.get_provider(
+    #             origin_type, constants.PROVIDER_TYPE_ENDPOINT_INSTANCES,
+    #             event_handler, raise_if_not_found=False)
+    #         if not source_endpoint_provider:
+    #             event_manager.progress_update(
+    #                 "Migration Export Provider for platform '%s' does not "
+    #                 "support querying instance export info" % origin_type)
+    #             return task_info
+    #         export_info = source_endpoint_provider.get_instance(
+    #             ctxt, origin_connection_info, source_environment, instance)
 
+    #     # validate Export info:
+    #     schemas.validate_value(
+    #         export_info, schemas.CORIOLIS_VM_EXPORT_INFO_SCHEMA)
+    #     # NOTE: this export info will get overridden with updated values
+    #     # and disk paths after the ExportInstanceTask.
+    #     task_info["export_info"] = export_info
 
-class ValidateMigrationDestinationInputsTask(base.TaskRunner):
-    def run(self, ctxt, instance, origin, destination, task_info,
-            event_handler):
-        event_manager = events.EventManager(event_handler)
-        destination_type = destination["type"]
-        if task_info.get("export_info") is None:
-            event_manager.progress_update(
-                "Instance export info is not set. Cannot perform Migration "
-                "Import validation for destination platform "
-                "'%s'" % destination_type)
-            return task_info
-
-        destination_connection_info = base.get_connection_info(
-            ctxt, destination)
-        destination_provider = providers_factory.get_provider(
-            destination_type,
-            constants.PROVIDER_TYPE_VALIDATE_MIGRATION_IMPORT, event_handler,
-            raise_if_not_found=False)
-        if not destination_provider:
-            event_manager.progress_update(
-                "Migration Import Provider for platform '%s' does not "
-                "support Migration input validation" % destination_type)
-            return task_info
-
-        # NOTE: the target environment JSON schema should have been validated
-        # upon accepting the Migration API creation request.
-        target_environment = destination.get("target_environment", {})
-        destination_provider.validate_migration_import_input(
-            ctxt, destination_connection_info, target_environment,
-            task_info["export_info"])
+    #     return task_info
 
-        return task_info
+
+# class ValidateMigrationDestinationInputsTask(base.TaskRunner):
+    # def run(self, ctxt, instance, origin, destination, task_info,
+    #         event_handler):
+    #     event_manager = events.EventManager(event_handler)
+    #     destination_type = destination["type"]
+    #     if task_info.get("export_info") is None:
+    #         event_manager.progress_update(
+    #             "Instance export info is not set. Cannot perform Migration "
+    #             "Import validation for destination platform "
+    #             "'%s'" % destination_type)
+    #         return task_info
+
+    #     destination_connection_info = base.get_connection_info(
+    #         ctxt, destination)
+    #     destination_provider = providers_factory.get_provider(
+    #         destination_type,
+    #         constants.PROVIDER_TYPE_VALIDATE_MIGRATION_IMPORT, event_handler,
+    #         raise_if_not_found=False)
+    #     if not destination_provider:
+    #         event_manager.progress_update(
+    #             "Migration Import Provider for platform '%s' does not "
+    #             "support Migration input validation" % destination_type)
+    #         return task_info
+
+    #     # NOTE: the target environment JSON schema should have been validated
+    #     # upon accepting the Migration API creation request.
+    #     target_environment = destination.get("target_environment", {})
+    #     destination_provider.validate_migration_import_input(
+    #         ctxt, destination_connection_info, target_environment,
+    #         task_info["export_info"])
+
+    #     return task_info
+
+class DeployMigrationSourceResourcesTask(
+        replica_tasks.DeployReplicaSourceResourcesTask):
+    pass
+
+
+class DeployMigrationTargetResourcesTask(
+        replica_tasks.DeployReplicaTargetResourcesTask):
+    pass
+
+
+class CreateInstanceDisksTask(
+        replica_tasks.DeployReplicaDisksTask):
+    pass
+
+
+class CleanupInstanceStorageTask(replica_tasks.DeleteReplicaDisksTask):
+    pass
+
+
+class FinalizeInstanceDeploymentTask(
+        replica_tasks.FinalizeReplicaInstanceDeploymentTask):
+    pass
+
+
+class CleanupFailedInstanceDeploymentTask(
+        replica_tasks.CleanupFailedReplicaInstanceDeploymentTask):
+    pass
+
+
+class ValidateMigrationSourceInputsTask(
+        replica_tasks.ValidateReplicaExecutionSourceInputsTask):
+    pass
+
+
+class ValidateMigrationDestinationInputsTask(
+        replica_tasks.ValidateReplicaExecutionDestinationInputsTask):
+    pass
+
+
+class DeleteMigrationSourceResourcesTask(
+        replica_tasks.DeleteReplicaSourceResourcesTask):
+    pass
+
+
+class DeleteMigrationTargetResourcesTask(
+        replica_tasks.DeleteReplicaTargetResourcesTask):
+    pass
+
+
+class DeployInstanceResourcesTask(replica_tasks.DeployReplicaInstanceTask):
+    pass