瀏覽代碼

Ensure disk ordering is preserved during Replication.

Coriolis-core will not both validate the schema of the returned
'volumes_info' of each Replica task, as well as reorder the returned
volumes into to match the exact ordering found in
export_info[devices][disks].
Nashwan Azhari 6 年之前
父節點
當前提交
f4ce6baa05

+ 20 - 5
coriolis/providers/backup_writers.py

@@ -16,15 +16,17 @@ from coriolis import data_transfer
 from coriolis import exception
 from coriolis import utils
 
-LOG = logging.getLogger(__name__)
+
 CONF = cfg.CONF
 opts = [
     cfg.BoolOpt('compress_transfers',
-               default=True,
-               help='Use compression if possible during disk transfers'),
+                default=True,
+                help='Use compression if possible during disk transfers'),
 ]
 CONF.register_opts(opts)
 
+LOG = logging.getLogger(__name__)
+
 
 class BaseBackupWriterImpl(with_metaclass(abc.ABCMeta)):
     def __init__(self, path, disk_id):
@@ -213,8 +215,21 @@ class SSHBackupWriter(BaseBackupWriter):
     def _get_impl(self, path, disk_id):
         ssh = self._connect_ssh()
 
-        path = [v for v in self._volumes_info
-                if v["disk_id"] == disk_id][0]["volume_dev"]
+        matching_devs = [
+            v for v in self._volumes_info if v["disk_id"] == disk_id]
+
+        if not matching_devs:
+            base_msg = (
+                "Could not locate disk with ID '%s' in volumes_info" % disk_id)
+            LOG.error("%s: %s", base_msg, self._volumes_info)
+            raise exception.CoriolisException(base_msg)
+        elif len(matching_devs) > 1:
+            base_msg = (
+                "Multiple disks with ID '%s' in volumes_info" % disk_id)
+            LOG.error("%s: %s", base_msg, self._volumes_info)
+            raise exception.CoriolisException(base_msg)
+
+        path = matching_devs[0]["volume_dev"]
         impl = SSHBackupWriterImpl(path, disk_id)
 
         self._copy_helper_cmd(ssh)

+ 4 - 0
coriolis/schemas.py

@@ -31,6 +31,7 @@ _CORIOLIS_SOURCE_OPTIONS_SCHEMA_NAME = "source_options_schema.json"
 _CORIOLIS_NETWORK_MAP_SCHEMA_NAME = "network_map_schema.json"
 _CORIOLIS_STORAGE_MAPPINGS_SCHEMA_NAME = "storage_mappings_schema.json"
 _CORIOLIS_VM_STORAGE_SCHEMA_NAME = "vm_storage_schema.json"
+_CORIOLIS_VOLUMES_INFO_SCHEMA_NAME = "volumes_info_schema.json"
 
 
 def get_schema(package_name, schema_name,
@@ -102,3 +103,6 @@ CORIOLIS_STORAGE_MAPPINGS_SCHEMA = get_schema(
 
 CORIOLIS_VM_STORAGE_SCHEMA = get_schema(
     __name__, _CORIOLIS_VM_STORAGE_SCHEMA_NAME)
+
+CORIOLIS_VOLUMES_INFO_SCHEMA = get_schema(
+    __name__, _CORIOLIS_VOLUMES_INFO_SCHEMA_NAME)

+ 1 - 1
coriolis/schemas/os_morphing_resources_schema.json

@@ -1,5 +1,5 @@
 {
-  "$schema": "http://cloudbase.it/coriolis/schemas/import_info#",
+  "$schema": "http://cloudbase.it/coriolis/schemas/osmorphing_resources#",
   "type": "object",
   "properties": {
     "osmorphing_connection_info": {

+ 16 - 0
coriolis/schemas/volumes_info_schema.json

@@ -0,0 +1,16 @@
+{
+  "$schema": "http://cloudbase.it/coriolis/schemas/volumes_info#",
+  "type": "array",
+  "description": "List of info on volumes replicated to a destination platform. Most fields inside the individual volumes info object are arbitrary and provider-specific, with the exception of 'disk_id'.",
+  "items": {
+    "type": "object",
+    "properties": {
+      "disk_id": {
+        "type": "string",
+        "description": "Unique identifier the *source* disk corresponding to this set of volume info had (should be export_info[devices][disks][id])"
+      }
+    },
+    "required": ["disk_id"],
+    "additionalProperties": true
+  }
+}

+ 72 - 1
coriolis/tasks/replica_tasks.py

@@ -21,6 +21,40 @@ def _get_volumes_info(task_info):
     return volumes_info
 
 
+def _check_ensure_volumes_info_ordering(export_info, volumes_info):
+    """ Returns a new list of volumes_info, ensuring that the order of
+    the disks in 'volumes_info' is consistent with the order that the
+    disks appear in 'export_info[devices][disks]'
+    """
+    instance = export_info.get(
+        'instance_name',
+        export_info.get('name', export_info['id']))
+    ordered_volumes_info = []
+    for disk in export_info['devices']['disks']:
+        disk_id = disk['id']
+        matching_volumes = [
+            vol for vol in volumes_info if vol['disk_id'] == disk_id]
+        if not matching_volumes:
+            raise exception.CoriolisException(
+                "Could not find source disk '%s' (ID '%s') in Replica "
+                "volumes info: %s" % (disk, disk_id, volumes_info))
+        elif len(matching_volumes) > 1:
+            raise exception.CoriolisException(
+                "Multiple disks with ID '%s' foind in Replica "
+                "volumes info: %s" % (disk_id, volumes_info))
+
+        ordered_volumes_info.append(matching_volumes[0])
+
+    LOG.debug(
+        "volumes_info returned by provider for instance "
+        "'%s': %s", instance, volumes_info)
+    LOG.debug(
+        "volumes_info for instance '%s' after "
+        "reordering: %s", instance, ordered_volumes_info)
+
+    return ordered_volumes_info
+
+
 class GetInstanceInfoTask(base.TaskRunner):
     def run(self, ctxt, instance, origin, destination, task_info,
             event_handler):
@@ -63,8 +97,11 @@ class ReplicateDisksTask(base.TaskRunner):
             origin["type"], constants.PROVIDER_TYPE_REPLICA_EXPORT,
             event_handler)
         connection_info = base.get_connection_info(ctxt, origin)
+        export_info = task_info["export_info"]
 
         volumes_info = _get_volumes_info(task_info)
+        schemas.validate_value(
+            volumes_info, schemas.CORIOLIS_VOLUMES_INFO_SCHEMA)
 
         migr_source_conn_info = base.unmarshal_migr_conn_info(
             task_info["migr_source_connection_info"])
@@ -80,6 +117,11 @@ class ReplicateDisksTask(base.TaskRunner):
             ctxt, connection_info, source_environment, instance,
             migr_source_conn_info, migr_target_conn_info, volumes_info,
             incremental)
+        schemas.validate_value(
+            volumes_info, schemas.CORIOLIS_VOLUMES_INFO_SCHEMA)
+
+        volumes_info = _check_ensure_volumes_info_ordering(
+            export_info, volumes_info)
 
         task_info["volumes_info"] = volumes_info
 
@@ -97,11 +139,19 @@ class DeployReplicaDisksTask(base.TaskRunner):
             event_handler)
         connection_info = base.get_connection_info(ctxt, destination)
 
-        volumes_info = task_info.get("volumes_info") or []
+        volumes_info = task_info.get("volumes_info", [])
+        if volumes_info is None:
+            # In case Replica disks were deleted:
+            volumes_info = []
 
         volumes_info = provider.deploy_replica_disks(
             ctxt, connection_info, target_environment, instance, export_info,
             volumes_info)
+        schemas.validate_value(
+            volumes_info, schemas.CORIOLIS_VOLUMES_INFO_SCHEMA)
+
+        volumes_info = _check_ensure_volumes_info_ordering(
+            export_info, volumes_info)
 
         task_info["volumes_info"] = volumes_info
 
@@ -292,11 +342,17 @@ class CreateReplicaDiskSnapshotsTask(base.TaskRunner):
             destination["type"], constants.PROVIDER_TYPE_REPLICA_IMPORT,
             event_handler)
         connection_info = base.get_connection_info(ctxt, destination)
+        export_info = task_info['export_info']
 
         volumes_info = _get_volumes_info(task_info)
 
         volumes_info = provider.create_replica_disk_snapshots(
             ctxt, connection_info, volumes_info)
+        schemas.validate_value(
+            volumes_info, schemas.CORIOLIS_VOLUMES_INFO_SCHEMA)
+
+        volumes_info = _check_ensure_volumes_info_ordering(
+            export_info, volumes_info)
 
         task_info["volumes_info"] = volumes_info
 
@@ -315,6 +371,10 @@ class DeleteReplicaDiskSnapshotsTask(base.TaskRunner):
 
         volumes_info = provider.delete_replica_disk_snapshots(
             ctxt, connection_info, volumes_info)
+        if volumes_info:
+            raise exception.CoriolisException(
+                "'volumes_info' should be void after disk deletion: %s" % (
+                    volumes_info))
 
         task_info["volumes_info"] = volumes_info
 
@@ -328,11 +388,17 @@ class RestoreReplicaDiskSnapshotsTask(base.TaskRunner):
             destination["type"], constants.PROVIDER_TYPE_REPLICA_IMPORT,
             event_handler)
         connection_info = base.get_connection_info(ctxt, destination)
+        export_info = task_info['export_info']
 
         volumes_info = _get_volumes_info(task_info)
 
         volumes_info = provider.restore_replica_disk_snapshots(
             ctxt, connection_info, volumes_info)
+        schemas.validate_value(
+            volumes_info, schemas.CORIOLIS_VOLUMES_INFO_SCHEMA)
+
+        volumes_info = _check_ensure_volumes_info_ordering(
+            export_info, volumes_info)
 
         task_info["volumes_info"] = volumes_info
 
@@ -497,6 +563,11 @@ class UpdateDestinationReplicaTask(base.TaskRunner):
             destination_provider.check_update_destination_environment_params(
                 ctxt, destination_connection_info, export_info, volumes_info,
                 old_destination_env, new_destination_env))
+        schemas.validate_value(
+            volumes_info, schemas.CORIOLIS_VOLUMES_INFO_SCHEMA)
+
+        volumes_info = _check_ensure_volumes_info_ordering(
+            export_info, volumes_info)
 
         task_info['volumes_info'] = volumes_info