فهرست منبع

Merge pull request #77 from aznashwan/replica-migrations

Switch to using Replica logic for Migrations.
Nashwan Azhari 6 سال پیش
والد
کامیت
076a8ecd55

+ 17 - 4
coriolis/api/v1/migrations.py

@@ -54,6 +54,13 @@ class MigrationController(api_wsgi.Controller):
             instances = migration["instances"]
             notes = migration.get("notes")
             skip_os_morphing = migration.get("skip_os_morphing", False)
+            shutdown_instances = migration.get(
+                "shutdown_instances", False)
+            replication_count = int(migration.get("replication_count", 2))
+            if replication_count not in range(1, 11):
+                raise ValueError(
+                    "'replication_count' must be an integer between 1 and 10."
+                    " Got: %s" % replication_count)
 
             source_environment = migration.get("source_environment", {})
             self._endpoints_api.validate_source_environment(
@@ -79,7 +86,8 @@ class MigrationController(api_wsgi.Controller):
 
             return (origin_endpoint_id, destination_endpoint_id,
                     source_environment, destination_environment, instances,
-                    notes, skip_os_morphing, network_map, storage_mappings)
+                    notes, skip_os_morphing, replication_count,
+                    shutdown_instances, network_map, storage_mappings)
         except Exception as ex:
             LOG.exception(ex)
             msg = getattr(ex, "message", str(ex))
@@ -108,13 +116,18 @@ class MigrationController(api_wsgi.Controller):
              destination_environment,
              instances,
              notes,
-             skip_os_morphing, network_map,
+             skip_os_morphing,
+             replication_count,
+             shutdown_instances,
+             network_map,
              storage_mappings) = self._validate_migration_input(
-                context, migration_body)
+                 context, migration_body)
             migration = self._migration_api.migrate_instances(
                 context, origin_endpoint_id, destination_endpoint_id,
                 source_environment, destination_environment, instances,
-                network_map, storage_mappings, notes, skip_os_morphing)
+                network_map, storage_mappings, replication_count,
+                shutdown_instances, notes=notes,
+                skip_os_morphing=skip_os_morphing)
 
         return migration_view.single(req, migration)
 

+ 9 - 4
coriolis/api/v1/views/replica_tasks_execution_view.py

@@ -4,11 +4,15 @@
 import itertools
 
 from oslo_config import cfg as conf
+from oslo_log import log as logging
 
 from coriolis import constants
 from coriolis import utils
 
 
+LOG = logging.getLogger(__name__)
+
+
 REPLICA_EXECUTION_API_OPTS = [
     conf.BoolOpt("include_task_info_in_replica_executions_api",
                  default=False,
@@ -38,10 +42,11 @@ def _sort_tasks(tasks):
                         constants.TASK_STATUS_ON_ERROR_ONLY and
                         t not in non_error_only_tasks]
 
-    sorted_tasks = utils.topological_graph_sorting(
-        non_error_only_tasks, sort_key="task_type")
-    sorted_tasks += utils.topological_graph_sorting(
-        error_only_tasks, sort_key="task_type")
+    sorted_tasks = sorted(
+        non_error_only_tasks, key=lambda t: t.get('index', 0))
+    sorted_tasks.extend(sorted(
+        error_only_tasks, key=lambda t: t.get('index', 0)))
+
     return sorted_tasks
 
 

+ 4 - 1
coriolis/conductor/rpc/client.py

@@ -194,7 +194,8 @@ class ConductorClient(object):
     def migrate_instances(self, ctxt, origin_endpoint_id,
                           destination_endpoint_id, source_environment,
                           destination_environment, instances, network_map,
-                          storage_mappings, notes=None,
+                          storage_mappings, replication_count,
+                          shutdown_instances=False, notes=None,
                           skip_os_morphing=False):
         return self._client.call(
             ctxt, 'migrate_instances',
@@ -203,6 +204,8 @@ class ConductorClient(object):
             destination_environment=destination_environment,
             instances=instances,
             notes=notes,
+            replication_count=replication_count,
+            shutdown_instances=shutdown_instances,
             skip_os_morphing=skip_os_morphing,
             network_map=network_map,
             storage_mappings=storage_mappings,

+ 112 - 42
coriolis/conductor/rpc/server.py

@@ -272,6 +272,7 @@ class ConductorServerEndpoint(object):
         task.task_type = task_type
         task.depends_on = depends_on
         task.on_error = on_error
+        task.index = len(task.execution.tasks) + 1
 
         if not on_error:
             task.status = constants.TASK_STATUS_PENDING
@@ -690,8 +691,9 @@ class ConductorServerEndpoint(object):
     def migrate_instances(self, ctxt, origin_endpoint_id,
                           destination_endpoint_id, source_environment,
                           destination_environment, instances, network_map,
-                          storage_mappings, skip_os_morphing=False,
-                          notes=None):
+                          storage_mappings, replication_count,
+                          shutdown_instances=False, notes=None,
+                          skip_os_morphing=False):
         origin_endpoint = self.get_endpoint(ctxt, origin_endpoint_id)
         destination_endpoint = self.get_endpoint(ctxt, destination_endpoint_id)
         self._check_endpoints(ctxt, origin_endpoint, destination_endpoint)
@@ -715,55 +717,103 @@ class ConductorServerEndpoint(object):
         migration.instances = instances
         migration.info = {}
         migration.notes = notes
+        migration.shutdown_instances = shutdown_instances
+        migration.replication_count = replication_count
 
         self._check_create_reservation_for_transfer(
             migration, licensing_client.RESERVATION_TYPE_MIGRATION)
 
         for instance in instances:
-            task_validate_source = self._create_task(
+            # NOTE: we must explicitly set this in each VM's info
+            # to prevent the Replica disks from being cloned:
+            migration.info[instance] = {"clone_disks": False}
+
+            get_instance_info_task = self._create_task(
+                instance, constants.TASK_TYPE_GET_INSTANCE_INFO,
+                execution)
+
+            validate_replica_source_inputs_task = self._create_task(
                 instance,
-                constants.TASK_TYPE_VALIDATE_MIGRATION_SOURCE_INPUTS,
+                constants.TASK_TYPE_VALIDATE_REPLICA_SOURCE_INPUTS,
                 execution)
 
-            task_validate_destination = self._create_task(
+            validate_replica_destination_inputs_task = self._create_task(
                 instance,
-                constants.TASK_TYPE_VALIDATE_MIGRATION_DESTINATION_INPUTS,
+                constants.TASK_TYPE_VALIDATE_REPLICA_DESTINATION_INPUTS,
                 execution,
-                depends_on=[task_validate_source.id])
+                depends_on=[get_instance_info_task.id])
 
-            task_export = self._create_task(
-                instance, constants.TASK_TYPE_EXPORT_INSTANCE, execution,
-                depends_on=[task_validate_destination.id])
+            depends_on = [
+                validate_replica_source_inputs_task.id,
+                validate_replica_destination_inputs_task.id]
 
-            if (constants.PROVIDER_TYPE_INSTANCE_FLAVOR in
-                    destination_provider_types):
-                get_optimal_flavor_task = self._create_task(
-                    instance, constants.TASK_TYPE_GET_OPTIMAL_FLAVOR,
-                    execution, depends_on=[task_export.id])
-                next_task = get_optimal_flavor_task.id
-            else:
-                next_task = task_export.id
+            deploy_replica_disks_task = self._create_task(
+                instance, constants.TASK_TYPE_DEPLOY_REPLICA_DISKS,
+                execution, depends_on=depends_on)
 
-            task_import = self._create_task(
-                instance, constants.TASK_TYPE_IMPORT_INSTANCE,
-                execution, depends_on=[next_task])
+            deploy_replica_source_resources_task = self._create_task(
+                instance,
+                constants.TASK_TYPE_DEPLOY_REPLICA_SOURCE_RESOURCES,
+                execution, depends_on=[deploy_replica_disks_task.id])
 
-            task_deploy_disk_copy_resources = self._create_task(
-                instance, constants.TASK_TYPE_DEPLOY_DISK_COPY_RESOURCES,
-                execution, depends_on=[task_import.id])
+            deploy_replica_target_resources_task = self._create_task(
+                instance,
+                constants.TASK_TYPE_DEPLOY_REPLICA_TARGET_RESOURCES,
+                execution, depends_on=[deploy_replica_disks_task.id])
 
-            task_copy_disk = self._create_task(
-                instance, constants.TASK_TYPE_COPY_DISK_DATA,
-                execution, depends_on=[task_deploy_disk_copy_resources.id])
+            # NOTE(aznashwan): re-executing the REPLICATE_DISKS task only works
+            # if all the source disk snapshotting and worker setup steps are
+            # performed by the source plugin in REPLICATE_DISKS.
+            # This should no longer be a problem when worker pooling lands.
+            # Alternatively, if the DEPLOY_REPLICA_SOURCE/DEST_RESOURCES tasks
+            # will no longer have a state conflict, iterating through and
+            # re-executing DEPLOY_REPLICA_SOURCE_RESOURCES will be required:
+            last_replica_task = None
+            replica_resources_tasks = [
+                deploy_replica_source_resources_task.id,
+                deploy_replica_target_resources_task.id]
+            for i in range(migration.replication_count):
+                # insert SHUTDOWN_INSTANCES task before the last sync:
+                if i == (migration.replication_count - 1) and (
+                        migration.shutdown_instances):
+                    shutdown_deps = replica_resources_tasks
+                    if last_replica_task:
+                        shutdown_deps = [last_replica_task.id]
+                    last_replica_task = self._create_task(
+                        instance, constants.TASK_TYPE_SHUTDOWN_INSTANCE,
+                        execution, depends_on=shutdown_deps)
+
+                replication_deps = replica_resources_tasks
+                if last_replica_task:
+                    replication_deps = [last_replica_task.id]
+
+                last_replica_task = self._create_task(
+                    instance, constants.TASK_TYPE_REPLICATE_DISKS,
+                    execution, depends_on=replication_deps)
+
+            delete_source_resources_task = self._create_task(
+                instance,
+                constants.TASK_TYPE_DELETE_REPLICA_SOURCE_RESOURCES,
+                execution, depends_on=[last_replica_task.id],
+                on_error=True)
+
+            delete_destination_resources_task = self._create_task(
+                instance,
+                constants.TASK_TYPE_DELETE_REPLICA_TARGET_RESOURCES,
+                execution, depends_on=[last_replica_task.id],
+                on_error=True)
 
-            task_delete_disk_copy_resources = self._create_task(
-                instance, constants.TASK_TYPE_DELETE_DISK_COPY_RESOURCES,
-                execution, depends_on=[task_copy_disk.id], on_error=True)
+            deploy_replica_task = self._create_task(
+                instance, constants.TASK_TYPE_DEPLOY_REPLICA_INSTANCE,
+                execution, depends_on=[
+                    delete_source_resources_task.id,
+                    delete_destination_resources_task.id])
 
+            last_task = deploy_replica_task
             if not skip_os_morphing:
                 task_deploy_os_morphing_resources = self._create_task(
                     instance, constants.TASK_TYPE_DEPLOY_OS_MORPHING_RESOURCES,
-                    execution, depends_on=[task_delete_disk_copy_resources.id])
+                    execution, depends_on=[last_task.id])
 
                 task_osmorphing = self._create_task(
                     instance, constants.TASK_TYPE_OS_MORPHING,
@@ -775,17 +825,27 @@ class ConductorServerEndpoint(object):
                     execution, depends_on=[task_osmorphing.id],
                     on_error=True)
 
-                next_task = task_delete_os_morphing_resources
-            else:
-                next_task = task_delete_disk_copy_resources
+                last_task = task_delete_os_morphing_resources
+
+            if (constants.PROVIDER_TYPE_INSTANCE_FLAVOR in
+                    destination_provider_types):
+                get_optimal_flavor_task = self._create_task(
+                    instance, constants.TASK_TYPE_GET_OPTIMAL_FLAVOR,
+                    execution, depends_on=[last_task.id])
+                last_task = get_optimal_flavor_task
 
             self._create_task(
-                instance, constants.TASK_TYPE_FINALIZE_IMPORT_INSTANCE,
-                execution, depends_on=[next_task.id])
+                instance,
+                constants.TASK_TYPE_FINALIZE_REPLICA_INSTANCE_DEPLOYMENT,
+                execution, depends_on=[last_task.id])
 
             self._create_task(
                 instance,
-                constants.TASK_TYPE_CLEANUP_FAILED_IMPORT_INSTANCE,
+                constants.TASK_TYPE_CLEANUP_FAILED_REPLICA_INSTANCE_DEPLOYMENT,
+                execution, on_error=True)
+
+            self._create_task(
+                instance, constants.TASK_TYPE_DELETE_REPLICA_DISKS,
                 execution, on_error=True)
 
         db_api.add_migration(ctxt, migration)
@@ -822,14 +882,20 @@ class ConductorServerEndpoint(object):
         self._check_delete_reservation_for_transfer(migration)
 
     def _cancel_tasks_execution(self, ctxt, execution, force=False):
+        has_error_tasks = False
         has_running_tasks = False
         for task in execution.tasks:
+            if task.on_error:
+                # NOTE: always allow on_error tasks to execute
+                # as they may do required cleanup:
+                has_error_tasks = True
+                continue
+
             if task.status == constants.TASK_STATUS_RUNNING:
                 self._rpc_worker_client.cancel_task(
                     ctxt, task.host, task.id, task.process_id, force)
                 has_running_tasks = True
-            elif (task.status == constants.TASK_STATUS_PENDING and
-                    not task.on_error):
+            elif task.status == constants.TASK_STATUS_PENDING:
                 db_api.set_task_status(
                     ctxt, task.id, constants.TASK_STATUS_CANCELED)
 
@@ -861,7 +927,12 @@ class ConductorServerEndpoint(object):
                 LOG.error("A required endpoint could not be found")
                 LOG.exception(ex)
 
-        if not has_running_tasks:
+        # NOTE: only set the whole execution to 'ERROR' if nothing's
+        # running and no on_error=True tasks are pending.
+        # Otherwise, the lifecycle of the rest of the execution will
+        # be governed in `task_completed` when any of the
+        # running/pending tasks finish:
+        if not has_running_tasks and not has_error_tasks:
             self._set_tasks_execution_status(
                 ctxt, execution.id, constants.EXECUTION_STATUS_ERROR)
 
@@ -952,8 +1023,7 @@ class ConductorServerEndpoint(object):
                     ctxt, execution.action_id, task.instance,
                     {"volumes_info": None})
 
-        elif task_type in (
-                constants.TASK_TYPE_FINALIZE_IMPORT_INSTANCE,
+        elif task_type == (
                 constants.TASK_TYPE_FINALIZE_REPLICA_INSTANCE_DEPLOYMENT):
             # set 'transfer_result' in the 'base_transfer_action'
             # table if the task returned a result.

+ 20 - 0
coriolis/db/sqlalchemy/migrate_repo/versions/012_adds_migration_sync_fields.py

@@ -0,0 +1,20 @@
+import sqlalchemy
+from sqlalchemy import types
+
+
+def upgrade(migrate_engine):
+    meta = sqlalchemy.MetaData()
+    meta.bind = migrate_engine
+
+    migration = sqlalchemy.Table(
+        'migration', meta, autoload=True)
+
+    shutdown_instances = sqlalchemy.Column(
+        "shutdown_instances", sqlalchemy.Boolean,
+        nullable=False, default=False)
+    migration.create_column(shutdown_instances)
+
+    replication_count = sqlalchemy.Column(
+        "replication_count", sqlalchemy.Integer, default=0,
+        nullable=False)
+    migration.create_column(replication_count)

+ 14 - 0
coriolis/db/sqlalchemy/migrate_repo/versions/013_adds_task_index.py

@@ -0,0 +1,14 @@
+import sqlalchemy
+from sqlalchemy import types
+
+
+def upgrade(migrate_engine):
+    meta = sqlalchemy.MetaData()
+    meta.bind = migrate_engine
+
+    task = sqlalchemy.Table('task', meta, autoload=True)
+
+    index = sqlalchemy.Column(
+        "index", sqlalchemy.Integer, default=0, nullable=False)
+
+    task.create_column(index)

+ 5 - 0
coriolis/db/sqlalchemy/models.py

@@ -59,6 +59,7 @@ class Task(BASE, models.TimestampMixin, models.SoftDeleteMixin,
     task_type = sqlalchemy.Column(sqlalchemy.String(100), nullable=False)
     exception_details = sqlalchemy.Column(sqlalchemy.Text, nullable=True)
     depends_on = sqlalchemy.Column(types.List, nullable=True)
+    index = sqlalchemy.Column(sqlalchemy.Integer, nullable=True)
     on_error = sqlalchemy.Column(sqlalchemy.Boolean, nullable=False)
     # TODO(alexpilotti): Add soft delete filter
     events = orm.relationship(TaskEvent, cascade="all,delete",
@@ -149,6 +150,10 @@ class Migration(BaseTransferAction):
         sqlalchemy.ForeignKey('replica.id'), nullable=True)
     replica = orm.relationship(
         Replica, backref=orm.backref("migrations"), foreign_keys=[replica_id])
+    shutdown_instances = sqlalchemy.Column(
+        sqlalchemy.Boolean, nullable=False, default=False)
+    replication_count = sqlalchemy.Column(
+        sqlalchemy.Integer, nullable=False, default=2)
 
     __mapper_args__ = {
         'polymorphic_identity': 'migration',

+ 5 - 2
coriolis/migrations/api.py

@@ -11,12 +11,15 @@ class API(object):
     def migrate_instances(self, ctxt, origin_endpoint_id,
                           destination_endpoint_id, source_environment,
                           destination_environment, instances, network_map,
-                          storage_mappings, notes=None,
+                          storage_mappings, replication_count,
+                          shutdown_instances, notes=None,
                           skip_os_morphing=False):
         return self._rpc_client.migrate_instances(
             ctxt, origin_endpoint_id, destination_endpoint_id,
             source_environment, destination_environment, instances,
-            network_map, storage_mappings, notes, skip_os_morphing)
+            network_map, storage_mappings,
+            replication_count, shutdown_instances=shutdown_instances,
+            notes=notes, skip_os_morphing=skip_os_morphing)
 
     def deploy_replica_instances(self, ctxt, replica_id, clone_disks=False,
                                  force=False, skip_os_morphing=False):

+ 8 - 4
coriolis/providers/factory.py

@@ -18,9 +18,11 @@ CONF = cfg.CONF
 CONF.register_opts(serialization_opts)
 
 PROVIDER_TYPE_MAP = {
-    constants.PROVIDER_TYPE_EXPORT: base.BaseExportProvider,
+    # NOTE(aznashwan): these have been disabled following the transition from
+    # classical disk-export-based migrations to Replica-based ones:
+    # constants.PROVIDER_TYPE_EXPORT: base.BaseExportProvider,
+    # constants.PROVIDER_TYPE_IMPORT: base.BaseImportProvider,
     constants.PROVIDER_TYPE_REPLICA_EXPORT: base.BaseReplicaExportProvider,
-    constants.PROVIDER_TYPE_IMPORT: base.BaseImportProvider,
     constants.PROVIDER_TYPE_REPLICA_IMPORT: base.BaseReplicaImportProvider,
     constants.PROVIDER_TYPE_ENDPOINT: base.BaseEndpointProvider,
     constants.PROVIDER_TYPE_DESTINATION_ENDPOINT_OPTIONS:
@@ -72,8 +74,10 @@ def get_provider(
         platform_name, provider_type, event_handler, raise_if_not_found=True):
     for provider in CONF.providers:
         cls = utils.load_class(provider)
-        if (cls.platform == platform_name and
-                issubclass(cls, PROVIDER_TYPE_MAP[provider_type])):
+        parent = PROVIDER_TYPE_MAP.get(provider_type)
+        if not parent:
+            continue
+        if (cls.platform == platform_name and issubclass(cls, parent)):
             return cls(event_handler)
 
     if raise_if_not_found:

+ 3 - 5
coriolis/worker/rpc/server.py

@@ -368,7 +368,7 @@ class WorkerServerEndpoint(object):
     def validate_endpoint_source_environment(
             self, ctxt, platform_name, source_env):
         provider = providers_factory.get_provider(
-            platform_name, constants.PROVIDER_TYPE_EXPORT, None)
+            platform_name, constants.PROVIDER_TYPE_REPLICA_EXPORT, None)
         source_env_schema = provider.get_source_environment_schema()
 
         is_valid = True
@@ -420,13 +420,11 @@ class WorkerServerEndpoint(object):
             schema = provider.get_connection_info_schema()
             schemas["connection_info_schema"] = schema
 
-        if provider_type in [constants.PROVIDER_TYPE_IMPORT,
-                             constants.PROVIDER_TYPE_REPLICA_IMPORT]:
+        if provider_type == constants.PROVIDER_TYPE_REPLICA_IMPORT:
             schema = provider.get_target_environment_schema()
             schemas["destination_environment_schema"] = schema
 
-        if provider_type in [constants.PROVIDER_TYPE_EXPORT,
-                             constants.PROVIDER_TYPE_REPLICA_EXPORT]:
+        if provider_type == constants.PROVIDER_TYPE_REPLICA_EXPORT:
             schema = provider.get_source_environment_schema()
             schemas["source_environment_schema"] = schema