Просмотр исходного кода

Merge pull request #42 from aznashwan/fix-disk-sync-order

Ensure disk ordering is preserved during Replication.
Nashwan Azhari 6 лет назад
Родитель
Сommit
60f60d9909

+ 20 - 5
coriolis/providers/backup_writers.py

@@ -16,15 +16,17 @@ from coriolis import data_transfer
 from coriolis import exception
 from coriolis import utils
 
-LOG = logging.getLogger(__name__)
+
 CONF = cfg.CONF
 opts = [
     cfg.BoolOpt('compress_transfers',
-               default=True,
-               help='Use compression if possible during disk transfers'),
+                default=True,
+                help='Use compression if possible during disk transfers'),
 ]
 CONF.register_opts(opts)
 
+LOG = logging.getLogger(__name__)
+
 
 class BaseBackupWriterImpl(with_metaclass(abc.ABCMeta)):
     def __init__(self, path, disk_id):
@@ -213,8 +215,21 @@ class SSHBackupWriter(BaseBackupWriter):
     def _get_impl(self, path, disk_id):
         ssh = self._connect_ssh()
 
-        path = [v for v in self._volumes_info
-                if v["disk_id"] == disk_id][0]["volume_dev"]
+        matching_devs = [
+            v for v in self._volumes_info if v["disk_id"] == disk_id]
+
+        if not matching_devs:
+            base_msg = (
+                "Could not locate disk with ID '%s' in volumes_info" % disk_id)
+            LOG.error("%s: %s", base_msg, self._volumes_info)
+            raise exception.CoriolisException(base_msg)
+        elif len(matching_devs) > 1:
+            base_msg = (
+                "Multiple disks with ID '%s' in volumes_info" % disk_id)
+            LOG.error("%s: %s", base_msg, self._volumes_info)
+            raise exception.CoriolisException(base_msg)
+
+        path = matching_devs[0]["volume_dev"]
         impl = SSHBackupWriterImpl(path, disk_id)
 
         self._copy_helper_cmd(ssh)

+ 30 - 4
coriolis/schemas.py

@@ -10,6 +10,8 @@ import jsonschema
 from oslo_log import log as logging
 
 from coriolis import exception
+from coriolis import utils 
+
 
 LOG = logging.getLogger(__name__)
 
@@ -31,6 +33,11 @@ _CORIOLIS_SOURCE_OPTIONS_SCHEMA_NAME = "source_options_schema.json"
 _CORIOLIS_NETWORK_MAP_SCHEMA_NAME = "network_map_schema.json"
 _CORIOLIS_STORAGE_MAPPINGS_SCHEMA_NAME = "storage_mappings_schema.json"
 _CORIOLIS_VM_STORAGE_SCHEMA_NAME = "vm_storage_schema.json"
+_CORIOLIS_VOLUMES_INFO_SCHEMA_NAME = "volumes_info_schema.json"
+_CORIOLIS_DISK_SYNC_RESOURCES_INFO_SCHEMA_NAME = (
+    "disk_sync_resources_info_schema.json")
+_CORIOLIS_DISK_SYNC_RESOURCES_CONN_INFO_SCHEMA_NAME = (
+    "disk_sync_resources_conn_info_schema.json")
 
 
 def get_schema(package_name, schema_name,
@@ -51,16 +58,26 @@ def get_schema(package_name, schema_name,
     return schema
 
 
-def validate_value(val, schema, format_checker=None):
+def validate_value(val, schema, format_checker=None, raise_on_error=True):
     """Simple wrapper for jsonschema.validate for usability.
-
     NOTE: silently passes empty schemas.
+
+    If `raise_on_error` is False, returns a boolean indicating whether
+    or not the validation was successful.
     """
     try:
         jsonschema.validate(val, schema, format_checker=format_checker)
     except jsonschema.exceptions.ValidationError as ex:
-        raise exception.SchemaValidationException(
-            "Schema validation failed: %s" % str(ex))
+        if raise_on_error:
+            raise exception.SchemaValidationException(
+                "Schema validation failed: %s" % str(ex))
+        else:
+            LOG.warn(
+                "Schema validation failed, ignoring: %s",
+                utils.get_exception_details())
+        return False
+
+    return True
 
 
 def validate_string(json_string, schema):
@@ -102,3 +119,12 @@ CORIOLIS_STORAGE_MAPPINGS_SCHEMA = get_schema(
 
 CORIOLIS_VM_STORAGE_SCHEMA = get_schema(
     __name__, _CORIOLIS_VM_STORAGE_SCHEMA_NAME)
+
+CORIOLIS_VOLUMES_INFO_SCHEMA = get_schema(
+    __name__, _CORIOLIS_VOLUMES_INFO_SCHEMA_NAME)
+
+CORIOLIS_DISK_SYNC_RESOURCES_INFO_SCHEMA = get_schema(
+    __name__, _CORIOLIS_DISK_SYNC_RESOURCES_INFO_SCHEMA_NAME)
+
+CORIOLIS_DISK_SYNC_RESOURCES_CONN_INFO_SCHEMA = get_schema(
+    __name__, _CORIOLIS_DISK_SYNC_RESOURCES_CONN_INFO_SCHEMA_NAME)

+ 38 - 0
coriolis/schemas/disk_sync_resources_conn_info_schema.json

@@ -0,0 +1,38 @@
+{
+  "$schema": "http://cloudbase.it/coriolis/schemas/disk_sync_resources_conn_info#",
+  "type": "object",
+  "description": "Object defining SSH connection parameters for a temporary minion VM which will carry out disk syncing and/or Linux OSMorphing",
+  "properties": {
+    "ip": {
+      "type": "string"
+    },
+    "port": {
+      "type": "integer"
+    },
+    "username": {
+      "type": "string"
+    },
+    "password": {
+      "$ref": "#/definitions/nullableString"
+    },
+    "pkey": {
+      "$ref": "#/definitions/nullableString"
+    },
+    "cert_pem": {
+      "type": "string"
+    },
+    "cert_key_pem": {
+      "type": "string"
+    }
+  },
+  "required": ["ip", "port", "username"],
+  "definitions": {
+    "nullableString": {
+      "oneOf": [{
+        "type": "string"
+      }, {
+        "type": "null"
+      }]
+    }
+  }
+}

+ 28 - 0
coriolis/schemas/disk_sync_resources_info_schema.json

@@ -0,0 +1,28 @@
+{
+  "$schema": "http://cloudbase.it/coriolis/schemas/disk_sync_resources_info#",
+  "type": "object",
+  "description": "Information returned after the 'DEPLOY_REPLICA_TARGET_RESOURCES' task and passed to 'REPLICATE_DISKS', as well as for 'DEPLOY_DISK_COPY_RESOURCES' and 'COPY_DISKS_DATA'. The only required property is the 'volumes_info', and the provider plugins may freely declare and use any other fields.",
+  "properties": {
+    "volumes_info": {
+      "type": "array",
+      "items": {
+        "type": "object",
+        "description": "Object with information on the replicated volumes. Provider plugins may add their fields as required.",
+        "properties": {
+          "disk_id": {
+            "type": "string",
+            "description": "String ID of the source disk the replicated volume corresponds to."
+          },
+          "volume_dev": {
+            "type": "string",
+            "description": "String device path (ex: /dev/sdc) from within the temporary minion VM where the disk was attached."
+          }
+        },
+        "required": ["disk_id", "volume_dev"],
+        "additionalProperties": true
+      }
+    }
+  },
+  "required": ["volumes_info"],
+  "additionalProperties": true
+}

+ 2 - 2
coriolis/schemas/os_morphing_resources_schema.json

@@ -1,5 +1,5 @@
 {
-  "$schema": "http://cloudbase.it/coriolis/schemas/import_info#",
+  "$schema": "http://cloudbase.it/coriolis/schemas/osmorphing_resources#",
   "type": "object",
   "properties": {
     "osmorphing_connection_info": {
@@ -27,7 +27,7 @@
           "type": "string"
         }
       },
-      "required": ["ip", "port"]
+      "required": ["ip", "port", "username"]
     },
     "osmorphing_info": {
       "type": "object",

+ 9 - 9
coriolis/schemas/vm_export_info_schema.json

@@ -76,7 +76,7 @@
             "type": "object",
             "properties": {
               "id": {
-                "$ref": "#/definitions/numberOrString"
+                "type": "string"
               },
               "format": {
                 "type": "string",
@@ -92,7 +92,7 @@
                 "type": "string"
               },
               "controller_id": {
-                "$ref": "#/definitions/numberOrString"
+                "type": "string"
               },
               "storage_backend_identifier": {
                 "type": "string",
@@ -118,13 +118,13 @@
             "type": "object",
             "properties": {
               "id": {
-                "$ref": "#/definitions/numberOrString"
+                "type": "string"
               },
               "unit_number": {
                 "$ref": "#/definitions/numberOrString"
               },
               "controller_id": {
-                "$ref": "#/definitions/numberOrString"
+                "type": "string"
               }
             },
             "required": [
@@ -157,7 +157,7 @@
                 "type": "integer"
               },
               "id": {
-                "$ref": "#/definitions/numberOrString"
+                "type": "string"
               },
               "mac_address": {
                 "$ref": "#/definitions/nullableString"
@@ -179,7 +179,7 @@
             "type": "object",
             "properties": {
               "id": {
-                "$ref": "#/definitions/numberOrString"
+                "type": "string"
               }
             }
           },
@@ -194,13 +194,13 @@
             "type": "object",
             "properties": {
               "id": {
-                "$ref": "#/definitions/numberOrString"
+                "type": "string"
               },
               "unit_number": {
                 "$ref": "#/definitions/numberOrString"
               },
               "controller_id": {
-                "$ref": "#/definitions/numberOrString"
+                "type": "string"
               }
             },
             "required": [
@@ -221,7 +221,7 @@
                 "$ref": "#/definitions/numberOrString"
               },
               "id": {
-                "$ref": "#/definitions/numberOrString"
+                "type": "string"
               }
             },
             "required": [

+ 16 - 0
coriolis/schemas/volumes_info_schema.json

@@ -0,0 +1,16 @@
+{
+  "$schema": "http://cloudbase.it/coriolis/schemas/volumes_info#",
+  "type": "array",
+  "description": "List of info on volumes replicated to a destination platform. Most fields inside the individual volumes info object are arbitrary and provider-specific, with the exception of 'disk_id'.",
+  "items": {
+    "type": "object",
+    "properties": {
+      "disk_id": {
+        "type": "string",
+        "description": "Unique identifier the *source* disk corresponding to this set of volume info had (should be export_info[devices][disks][id])"
+      }
+    },
+    "required": ["disk_id"],
+    "additionalProperties": true
+  }
+}

+ 26 - 6
coriolis/tasks/migration_tasks.py

@@ -76,13 +76,26 @@ class DeployDiskCopyResources(base.TaskRunner):
             ctxt, connection_info, target_environment,
             instance_deployment_info)
 
-        conn_info = resources_info[
+        instance_deployment_info = resources_info["instance_deployment_info"]
+        schemas.validate_value(
+            instance_deployment_info,
+            schemas.CORIOLIS_DISK_SYNC_RESOURCES_INFO_SCHEMA,
+            # NOTE: we avoid raising so that the cleanup task
+            # can [try] to deal with the temporary resources.
+            raise_on_error=False)
+
+        disk_sync_conn_info = resources_info[
             "instance_deployment_info"]["disk_sync_connection_info"]
-        conn_info = base.marshal_migr_conn_info(conn_info)
-        task_info["instance_deployment_info"] = resources_info[
-            "instance_deployment_info"]
-        task_info["instance_deployment_info"][
-            "disk_sync_connection_info"] = conn_info
+        disk_sync_conn_info = base.marshal_migr_conn_info(
+            disk_sync_conn_info)
+        schemas.validate_value(
+            disk_sync_conn_info,
+            schemas.CORIOLIS_DISK_SYNC_RESOURCES_CONN_INFO_SCHEMA)
+        instance_deployment_info[
+            'disk_sync_connection_info'] = disk_sync_conn_info
+
+        task_info["instance_deployment_info"] = instance_deployment_info
+
         # We need to retain export info until after disk sync
         # TODO(gsamfira): remove this when we implement multi-worker, and by
         # extension some external storage for needed resources (like swift)
@@ -95,7 +108,14 @@ class CopyDiskData(base.TaskRunner):
     def run(self, ctxt, instance, origin, destination, task_info,
             event_handler):
         instance_deployment_info = task_info["instance_deployment_info"]
+        schemas.validate_value(
+            instance_deployment_info['disk_sync_connection_info'],
+            schemas.CORIOLIS_DISK_SYNC_RESOURCES_CONN_INFO_SCHEMA)
+
         volumes_info = instance_deployment_info["volumes_info"]
+        schemas.validate_value(
+            {"volumes_info": volumes_info},
+            schemas.CORIOLIS_DISK_SYNC_RESOURCES_INFO_SCHEMA)
         LOG.info("Volumes info is: %r" % volumes_info)
 
         image_paths = [i.get("disk_image_uri") for i in volumes_info]

+ 126 - 10
coriolis/tasks/replica_tasks.py

@@ -21,6 +21,40 @@ def _get_volumes_info(task_info):
     return volumes_info
 
 
+def _check_ensure_volumes_info_ordering(export_info, volumes_info):
+    """ Returns a new list of volumes_info, ensuring that the order of
+    the disks in 'volumes_info' is consistent with the order that the
+    disks appear in 'export_info[devices][disks]'
+    """
+    instance = export_info.get(
+        'instance_name',
+        export_info.get('name', export_info['id']))
+    ordered_volumes_info = []
+    for disk in export_info['devices']['disks']:
+        disk_id = disk['id']
+        matching_volumes = [
+            vol for vol in volumes_info if vol['disk_id'] == disk_id]
+        if not matching_volumes:
+            raise exception.CoriolisException(
+                "Could not find source disk '%s' (ID '%s') in Replica "
+                "volumes info: %s" % (disk, disk_id, volumes_info))
+        elif len(matching_volumes) > 1:
+            raise exception.CoriolisException(
+                "Multiple disks with ID '%s' foind in Replica "
+                "volumes info: %s" % (disk_id, volumes_info))
+
+        ordered_volumes_info.append(matching_volumes[0])
+
+    LOG.debug(
+        "volumes_info returned by provider for instance "
+        "'%s': %s", instance, volumes_info)
+    LOG.debug(
+        "volumes_info for instance '%s' after "
+        "reordering: %s", instance, ordered_volumes_info)
+
+    return ordered_volumes_info
+
+
 class GetInstanceInfoTask(base.TaskRunner):
     def run(self, ctxt, instance, origin, destination, task_info,
             event_handler):
@@ -63,14 +97,27 @@ class ReplicateDisksTask(base.TaskRunner):
             origin["type"], constants.PROVIDER_TYPE_REPLICA_EXPORT,
             event_handler)
         connection_info = base.get_connection_info(ctxt, origin)
+        export_info = task_info["export_info"]
 
         volumes_info = _get_volumes_info(task_info)
-
+        schemas.validate_value(
+            {"volumes_info": volumes_info},
+            schemas.CORIOLIS_DISK_SYNC_RESOURCES_INFO_SCHEMA)
+
+        migr_source_conn_info = task_info["migr_source_connection_info"]
+        if migr_source_conn_info:
+            schemas.validate_value(
+                migr_source_conn_info,
+                schemas.CORIOLIS_DISK_SYNC_RESOURCES_CONN_INFO_SCHEMA)
         migr_source_conn_info = base.unmarshal_migr_conn_info(
-            task_info["migr_source_connection_info"])
+            migr_source_conn_info)
 
+        migr_target_conn_info = task_info["migr_target_connection_info"]
+        schemas.validate_value(
+            migr_target_conn_info,
+            schemas.CORIOLIS_DISK_SYNC_RESOURCES_CONN_INFO_SCHEMA)
         migr_target_conn_info = base.unmarshal_migr_conn_info(
-            task_info["migr_target_connection_info"])
+            migr_target_conn_info)
 
         incremental = task_info.get("incremental", True)
 
@@ -80,6 +127,11 @@ class ReplicateDisksTask(base.TaskRunner):
             ctxt, connection_info, source_environment, instance,
             migr_source_conn_info, migr_target_conn_info, volumes_info,
             incremental)
+        schemas.validate_value(
+            volumes_info, schemas.CORIOLIS_VOLUMES_INFO_SCHEMA)
+
+        volumes_info = _check_ensure_volumes_info_ordering(
+            export_info, volumes_info)
 
         task_info["volumes_info"] = volumes_info
 
@@ -97,11 +149,19 @@ class DeployReplicaDisksTask(base.TaskRunner):
             event_handler)
         connection_info = base.get_connection_info(ctxt, destination)
 
-        volumes_info = task_info.get("volumes_info") or []
+        volumes_info = task_info.get("volumes_info", [])
+        if volumes_info is None:
+            # In case Replica disks were deleted:
+            volumes_info = []
 
         volumes_info = provider.deploy_replica_disks(
             ctxt, connection_info, target_environment, instance, export_info,
             volumes_info)
+        schemas.validate_value(
+            volumes_info, schemas.CORIOLIS_VOLUMES_INFO_SCHEMA)
+
+        volumes_info = _check_ensure_volumes_info_ordering(
+            export_info, volumes_info)
 
         task_info["volumes_info"] = volumes_info
 
@@ -118,10 +178,14 @@ class DeleteReplicaDisksTask(base.TaskRunner):
 
         volumes_info = _get_volumes_info(task_info)
 
-        provider.delete_replica_disks(
+        volumes_info = provider.delete_replica_disks(
             ctxt, connection_info, volumes_info)
+        if volumes_info:
+            LOG.warn(
+                "'volumes_info' should have been void after disk "
+                "deletion: %s" % volumes_info)
 
-        task_info["volumes_info"] = None
+        task_info["volumes_info"] = []
 
         return task_info
 
@@ -140,8 +204,17 @@ class DeployReplicaSourceResourcesTask(base.TaskRunner):
 
         task_info["migr_source_resources"] = replica_resources_info[
             "migr_resources"]
-        migr_connection_info = base.marshal_migr_conn_info(
-            replica_resources_info["connection_info"])
+        migr_connection_info = replica_resources_info.get("connection_info")
+        if migr_connection_info:
+            migr_connection_info = base.marshal_migr_conn_info(
+                migr_connection_info)
+            schemas.validate_value(
+                migr_connection_info,
+                schemas.CORIOLIS_DISK_SYNC_RESOURCES_CONN_INFO_SCHEMA,
+                # NOTE: we avoid raising so that the cleanup task
+                # can [try] to deal with the temporary resources.
+                raise_on_error=False)
+
         task_info["migr_source_connection_info"] = migr_connection_info
 
         return task_info
@@ -173,6 +246,7 @@ class DeployReplicaTargetResourcesTask(base.TaskRunner):
     def run(self, ctxt, instance, origin, destination, task_info,
             event_handler):
         target_environment = destination.get("target_environment") or {}
+        export_info = task_info['export_info']
 
         provider = providers_factory.get_provider(
             destination["type"], constants.PROVIDER_TYPE_REPLICA_IMPORT,
@@ -183,13 +257,30 @@ class DeployReplicaTargetResourcesTask(base.TaskRunner):
 
         replica_resources_info = provider.deploy_replica_target_resources(
             ctxt, connection_info, target_environment, volumes_info)
+        schemas.validate_value(
+            replica_resources_info,
+            schemas.CORIOLIS_DISK_SYNC_RESOURCES_INFO_SCHEMA,
+            # NOTE: we avoid raising so that the cleanup task
+            # can [try] to deal with the temporary resources.
+            raise_on_error=False)
+
+        volumes_info = _check_ensure_volumes_info_ordering(
+            export_info, replica_resources_info["volumes_info"])
 
-        task_info["volumes_info"] = replica_resources_info["volumes_info"]
+        task_info["volumes_info"] = volumes_info
         task_info["migr_target_resources"] = replica_resources_info[
             "migr_resources"]
 
+        migr_connection_info = replica_resources_info["connection_info"]
         migr_connection_info = base.marshal_migr_conn_info(
-            replica_resources_info["connection_info"])
+            migr_connection_info)
+        schemas.validate_value(
+            migr_connection_info,
+            schemas.CORIOLIS_DISK_SYNC_RESOURCES_CONN_INFO_SCHEMA,
+            # NOTE: we avoid raising so that the cleanup task
+            # can [try] to deal with the temporary resources.
+            raise_on_error=False)
+
         task_info["migr_target_connection_info"] = migr_connection_info
 
         return task_info
@@ -292,11 +383,17 @@ class CreateReplicaDiskSnapshotsTask(base.TaskRunner):
             destination["type"], constants.PROVIDER_TYPE_REPLICA_IMPORT,
             event_handler)
         connection_info = base.get_connection_info(ctxt, destination)
+        export_info = task_info['export_info']
 
         volumes_info = _get_volumes_info(task_info)
 
         volumes_info = provider.create_replica_disk_snapshots(
             ctxt, connection_info, volumes_info)
+        schemas.validate_value(
+            volumes_info, schemas.CORIOLIS_VOLUMES_INFO_SCHEMA)
+
+        volumes_info = _check_ensure_volumes_info_ordering(
+            export_info, volumes_info)
 
         task_info["volumes_info"] = volumes_info
 
@@ -306,6 +403,7 @@ class CreateReplicaDiskSnapshotsTask(base.TaskRunner):
 class DeleteReplicaDiskSnapshotsTask(base.TaskRunner):
     def run(self, ctxt, instance, origin, destination, task_info,
             event_handler):
+        export_info = task_info['export_info']
         provider = providers_factory.get_provider(
             destination["type"], constants.PROVIDER_TYPE_REPLICA_IMPORT,
             event_handler)
@@ -315,6 +413,11 @@ class DeleteReplicaDiskSnapshotsTask(base.TaskRunner):
 
         volumes_info = provider.delete_replica_disk_snapshots(
             ctxt, connection_info, volumes_info)
+        schemas.validate_value(
+            volumes_info, schemas.CORIOLIS_VOLUMES_INFO_SCHEMA)
+
+        volumes_info = _check_ensure_volumes_info_ordering(
+            export_info, volumes_info)
 
         task_info["volumes_info"] = volumes_info
 
@@ -328,11 +431,17 @@ class RestoreReplicaDiskSnapshotsTask(base.TaskRunner):
             destination["type"], constants.PROVIDER_TYPE_REPLICA_IMPORT,
             event_handler)
         connection_info = base.get_connection_info(ctxt, destination)
+        export_info = task_info['export_info']
 
         volumes_info = _get_volumes_info(task_info)
 
         volumes_info = provider.restore_replica_disk_snapshots(
             ctxt, connection_info, volumes_info)
+        schemas.validate_value(
+            volumes_info, schemas.CORIOLIS_VOLUMES_INFO_SCHEMA)
+
+        volumes_info = _check_ensure_volumes_info_ordering(
+            export_info, volumes_info)
 
         task_info["volumes_info"] = volumes_info
 
@@ -458,6 +567,8 @@ class UpdateSourceReplicaTask(base.TaskRunner):
             source_provider.check_update_source_environment_params(
                 ctxt, origin_connection_info, instance, volumes_info,
                 old_source_env, new_source_env))
+        schemas.validate_value(
+            volumes_info, schemas.CORIOLIS_VOLUMES_INFO_SCHEMA)
 
         task_info['volumes_info'] = volumes_info
 
@@ -497,6 +608,11 @@ class UpdateDestinationReplicaTask(base.TaskRunner):
             destination_provider.check_update_destination_environment_params(
                 ctxt, destination_connection_info, export_info, volumes_info,
                 old_destination_env, new_destination_env))
+        schemas.validate_value(
+            volumes_info, schemas.CORIOLIS_VOLUMES_INFO_SCHEMA)
+
+        volumes_info = _check_ensure_volumes_info_ordering(
+            export_info, volumes_info)
 
         task_info['volumes_info'] = volumes_info