Просмотр исходного кода

Add dedicated schemas for the disk sync resources.

Added the following schemas:
    - disk_sync_resources_conn_info_schema.json -- for defining SSH
    connection params for temporary minion VMs
    - disk_sync_resources_info_schema.json -- schema defining
    volumes_info for disk syncing (both for Migrations and Replicas)
Nashwan Azhari 6 лет назад
Родитель
Сommit
2f1e96f69e

+ 26 - 4
coriolis/schemas.py

@@ -10,6 +10,8 @@ import jsonschema
 from oslo_log import log as logging
 
 from coriolis import exception
+from coriolis import utils 
+
 
 LOG = logging.getLogger(__name__)
 
@@ -32,6 +34,10 @@ _CORIOLIS_NETWORK_MAP_SCHEMA_NAME = "network_map_schema.json"
 _CORIOLIS_STORAGE_MAPPINGS_SCHEMA_NAME = "storage_mappings_schema.json"
 _CORIOLIS_VM_STORAGE_SCHEMA_NAME = "vm_storage_schema.json"
 _CORIOLIS_VOLUMES_INFO_SCHEMA_NAME = "volumes_info_schema.json"
+_CORIOLIS_DISK_SYNC_RESOURCES_INFO_SCHEMA_NAME = (
+    "disk_sync_resources_info_schema.json")
+_CORIOLIS_DISK_SYNC_RESOURCES_CONN_INFO_SCHEMA_NAME = (
+    "disk_sync_resources_conn_info_schema.json")
 
 
 def get_schema(package_name, schema_name,
@@ -52,16 +58,26 @@ def get_schema(package_name, schema_name,
     return schema
 
 
-def validate_value(val, schema, format_checker=None):
+def validate_value(val, schema, format_checker=None, raise_on_error=True):
     """Simple wrapper for jsonschema.validate for usability.
-
     NOTE: silently passes empty schemas.
+
+    If `raise_on_error` is False, returns a boolean indicating whether
+    or not the validation was successful.
     """
     try:
         jsonschema.validate(val, schema, format_checker=format_checker)
     except jsonschema.exceptions.ValidationError as ex:
-        raise exception.SchemaValidationException(
-            "Schema validation failed: %s" % str(ex))
+        if raise_on_error:
+            raise exception.SchemaValidationException(
+                "Schema validation failed: %s" % str(ex))
+        else:
+            LOG.warn(
+                "Schema validation failed, ignoring: %s",
+                utils.get_exception_details())
+        return False
+
+    return True
 
 
 def validate_string(json_string, schema):
@@ -106,3 +122,9 @@ CORIOLIS_VM_STORAGE_SCHEMA = get_schema(
 
 CORIOLIS_VOLUMES_INFO_SCHEMA = get_schema(
     __name__, _CORIOLIS_VOLUMES_INFO_SCHEMA_NAME)
+
+CORIOLIS_DISK_SYNC_RESOURCES_INFO_SCHEMA = get_schema(
+    __name__, _CORIOLIS_DISK_SYNC_RESOURCES_INFO_SCHEMA_NAME)
+
+CORIOLIS_DISK_SYNC_RESOURCES_CONN_INFO_SCHEMA = get_schema(
+    __name__, _CORIOLIS_DISK_SYNC_RESOURCES_CONN_INFO_SCHEMA_NAME)

+ 38 - 0
coriolis/schemas/disk_sync_resources_conn_info_schema.json

@@ -0,0 +1,38 @@
+{
+  "$schema": "http://cloudbase.it/coriolis/schemas/disk_sync_resources_conn_info#",
+  "type": "object",
+  "description": "Object defining SSH connection parameters for a temporary minion VM which will carry out disk syncing and/or Linux OSMorphing",
+  "properties": {
+    "ip": {
+      "type": "string"
+    },
+    "port": {
+      "type": "integer"
+    },
+    "username": {
+      "type": "string"
+    },
+    "password": {
+      "$ref": "#/definitions/nullableString"
+    },
+    "pkey": {
+      "$ref": "#/definitions/nullableString"
+    },
+    "cert_pem": {
+      "type": "string"
+    },
+    "cert_key_pem": {
+      "type": "string"
+    }
+  },
+  "required": ["ip", "port", "username"],
+  "definitions": {
+    "nullableString": {
+      "oneOf": [{
+        "type": "string"
+      }, {
+        "type": "null"
+      }]
+    }
+  }
+}

+ 28 - 0
coriolis/schemas/disk_sync_resources_info_schema.json

@@ -0,0 +1,28 @@
+{
+  "$schema": "http://cloudbase.it/coriolis/schemas/disk_sync_resources_info#",
+  "type": "object",
+  "description": "Information returned after the 'DEPLOY_REPLICA_TARGET_RESOURCES' task and passed to 'REPLICATE_DISKS', as well as for 'DEPLOY_DISK_COPY_RESOURCES' and 'COPY_DISKS_DATA'. The only required property is the 'volumes_info', and the provider plugins may freely declare and use any other fields.",
+  "properties": {
+    "volumes_info": {
+      "type": "array",
+      "items": {
+        "type": "object",
+        "description": "Object with information on the replicated volumes. Provider plugins may add their fields as required.",
+        "properties": {
+          "disk_id": {
+            "type": "string",
+            "description": "String ID of the source disk the replicated volume corresponds to."
+          },
+          "volume_dev": {
+            "type": "string",
+            "description": "String device path (ex: /dev/sdc) from within the temporary minion VM where the disk was attached."
+          }
+        },
+        "required": ["disk_id", "volume_dev"],
+        "additionalProperties": true
+      }
+    }
+  },
+  "required": ["volumes_info"],
+  "additionalProperties": true
+}

+ 1 - 1
coriolis/schemas/os_morphing_resources_schema.json

@@ -27,7 +27,7 @@
           "type": "string"
         }
       },
-      "required": ["ip", "port"]
+      "required": ["ip", "port", "username"]
     },
     "osmorphing_info": {
       "type": "object",

+ 26 - 6
coriolis/tasks/migration_tasks.py

@@ -76,13 +76,26 @@ class DeployDiskCopyResources(base.TaskRunner):
             ctxt, connection_info, target_environment,
             instance_deployment_info)
 
-        conn_info = resources_info[
+        instance_deployment_info = resources_info["instance_deployment_info"]
+        schemas.validate_value(
+            instance_deployment_info,
+            schemas.CORIOLIS_DISK_SYNC_RESOURCES_INFO_SCHEMA,
+            # NOTE: we avoid raising so that the cleanup task
+            # can [try] to deal with the temporary resources.
+            raise_on_error=False)
+
+        disk_sync_conn_info = resources_info[
             "instance_deployment_info"]["disk_sync_connection_info"]
-        conn_info = base.marshal_migr_conn_info(conn_info)
-        task_info["instance_deployment_info"] = resources_info[
-            "instance_deployment_info"]
-        task_info["instance_deployment_info"][
-            "disk_sync_connection_info"] = conn_info
+        disk_sync_conn_info = base.marshal_migr_conn_info(
+            disk_sync_conn_info)
+        schemas.validate_value(
+            disk_sync_conn_info,
+            schemas.CORIOLIS_DISK_SYNC_RESOURCES_CONN_INFO_SCHEMA)
+        instance_deployment_info[
+            'disk_sync_connection_info'] = disk_sync_conn_info
+
+        task_info["instance_deployment_info"] = instance_deployment_info
+
         # We need to retain export info until after disk sync
         # TODO(gsamfira): remove this when we implement multi-worker, and by
         # extension some external storage for needed resources (like swift)
@@ -95,7 +108,14 @@ class CopyDiskData(base.TaskRunner):
     def run(self, ctxt, instance, origin, destination, task_info,
             event_handler):
         instance_deployment_info = task_info["instance_deployment_info"]
+        schemas.validate_value(
+            instance_deployment_info['disk_sync_connection_info'],
+            schemas.CORIOLIS_DISK_SYNC_RESOURCES_CONN_INFO_SCHEMA)
+
         volumes_info = instance_deployment_info["volumes_info"]
+        schemas.validate_value(
+            {"volumes_info": volumes_info},
+            schemas.CORIOLIS_DISK_SYNC_RESOURCES_INFO_SCHEMA)
         LOG.info("Volumes info is: %r" % volumes_info)
 
         image_paths = [i.get("disk_image_uri") for i in volumes_info]

+ 59 - 14
coriolis/tasks/replica_tasks.py

@@ -101,13 +101,23 @@ class ReplicateDisksTask(base.TaskRunner):
 
         volumes_info = _get_volumes_info(task_info)
         schemas.validate_value(
-            volumes_info, schemas.CORIOLIS_VOLUMES_INFO_SCHEMA)
-
+            {"volumes_info": volumes_info},
+            schemas.CORIOLIS_DISK_SYNC_RESOURCES_INFO_SCHEMA)
+
+        migr_source_conn_info = task_info["migr_source_connection_info"]
+        if migr_source_conn_info:
+            schemas.validate_value(
+                migr_source_conn_info,
+                schemas.CORIOLIS_DISK_SYNC_RESOURCES_CONN_INFO_SCHEMA)
         migr_source_conn_info = base.unmarshal_migr_conn_info(
-            task_info["migr_source_connection_info"])
+            migr_source_conn_info)
 
+        migr_target_conn_info = task_info["migr_target_connection_info"]
+        schemas.validate_value(
+            migr_target_conn_info,
+            schemas.CORIOLIS_DISK_SYNC_RESOURCES_CONN_INFO_SCHEMA)
         migr_target_conn_info = base.unmarshal_migr_conn_info(
-            task_info["migr_target_connection_info"])
+            migr_target_conn_info)
 
         incremental = task_info.get("incremental", True)
 
@@ -168,10 +178,14 @@ class DeleteReplicaDisksTask(base.TaskRunner):
 
         volumes_info = _get_volumes_info(task_info)
 
-        provider.delete_replica_disks(
+        volumes_info = provider.delete_replica_disks(
             ctxt, connection_info, volumes_info)
+        if volumes_info:
+            LOG.warn(
+                "'volumes_info' should have been void after disk "
+                "deletion: %s" % volumes_info)
 
-        task_info["volumes_info"] = None
+        task_info["volumes_info"] = []
 
         return task_info
 
@@ -190,8 +204,17 @@ class DeployReplicaSourceResourcesTask(base.TaskRunner):
 
         task_info["migr_source_resources"] = replica_resources_info[
             "migr_resources"]
-        migr_connection_info = base.marshal_migr_conn_info(
-            replica_resources_info["connection_info"])
+        migr_connection_info = replica_resources_info.get("connection_info")
+        if migr_connection_info:
+            migr_connection_info = base.marshal_migr_conn_info(
+                migr_connection_info)
+            schemas.validate_value(
+                migr_connection_info,
+                schemas.CORIOLIS_DISK_SYNC_RESOURCES_CONN_INFO_SCHEMA,
+                # NOTE: we avoid raising so that the cleanup task
+                # can [try] to deal with the temporary resources.
+                raise_on_error=False)
+
         task_info["migr_source_connection_info"] = migr_connection_info
 
         return task_info
@@ -223,6 +246,7 @@ class DeployReplicaTargetResourcesTask(base.TaskRunner):
     def run(self, ctxt, instance, origin, destination, task_info,
             event_handler):
         target_environment = destination.get("target_environment") or {}
+        export_info = task_info['export_info']
 
         provider = providers_factory.get_provider(
             destination["type"], constants.PROVIDER_TYPE_REPLICA_IMPORT,
@@ -233,13 +257,30 @@ class DeployReplicaTargetResourcesTask(base.TaskRunner):
 
         replica_resources_info = provider.deploy_replica_target_resources(
             ctxt, connection_info, target_environment, volumes_info)
+        schemas.validate_value(
+            replica_resources_info,
+            schemas.CORIOLIS_DISK_SYNC_RESOURCES_INFO_SCHEMA,
+            # NOTE: we avoid raising so that the cleanup task
+            # can [try] to deal with the temporary resources.
+            raise_on_error=False)
 
-        task_info["volumes_info"] = replica_resources_info["volumes_info"]
+        volumes_info = _check_ensure_volumes_info_ordering(
+            export_info, replica_resources_info["volumes_info"])
+
+        task_info["volumes_info"] = volumes_info
         task_info["migr_target_resources"] = replica_resources_info[
             "migr_resources"]
 
+        migr_connection_info = replica_resources_info["connection_info"]
         migr_connection_info = base.marshal_migr_conn_info(
-            replica_resources_info["connection_info"])
+            migr_connection_info)
+        schemas.validate_value(
+            migr_connection_info,
+            schemas.CORIOLIS_DISK_SYNC_RESOURCES_CONN_INFO_SCHEMA,
+            # NOTE: we avoid raising so that the cleanup task
+            # can [try] to deal with the temporary resources.
+            raise_on_error=False)
+
         task_info["migr_target_connection_info"] = migr_connection_info
 
         return task_info
@@ -362,6 +403,7 @@ class CreateReplicaDiskSnapshotsTask(base.TaskRunner):
 class DeleteReplicaDiskSnapshotsTask(base.TaskRunner):
     def run(self, ctxt, instance, origin, destination, task_info,
             event_handler):
+        export_info = task_info['export_info']
         provider = providers_factory.get_provider(
             destination["type"], constants.PROVIDER_TYPE_REPLICA_IMPORT,
             event_handler)
@@ -371,10 +413,11 @@ class DeleteReplicaDiskSnapshotsTask(base.TaskRunner):
 
         volumes_info = provider.delete_replica_disk_snapshots(
             ctxt, connection_info, volumes_info)
-        if volumes_info:
-            raise exception.CoriolisException(
-                "'volumes_info' should be void after disk deletion: %s" % (
-                    volumes_info))
+        schemas.validate_value(
+            volumes_info, schemas.CORIOLIS_VOLUMES_INFO_SCHEMA)
+
+        volumes_info = _check_ensure_volumes_info_ordering(
+            export_info, volumes_info)
 
         task_info["volumes_info"] = volumes_info
 
@@ -524,6 +567,8 @@ class UpdateSourceReplicaTask(base.TaskRunner):
             source_provider.check_update_source_environment_params(
                 ctxt, origin_connection_info, instance, volumes_info,
                 old_source_env, new_source_env))
+        schemas.validate_value(
+            volumes_info, schemas.CORIOLIS_VOLUMES_INFO_SCHEMA)
 
         task_info['volumes_info'] = volumes_info