Kaynağa Gözat

Merge pull request #123 from aznashwan/worker-key-serialization

Make writer key serialization/deserialization more robust.
Nashwan Azhari 6 yıl önce
ebeveyn
işleme
d7dd183222

+ 8 - 9
coriolis/conductor/rpc/server.py

@@ -1101,10 +1101,6 @@ class ConductorServerEndpoint(object):
             # if all the source disk snapshotting and worker setup steps are
             # performed by the source plugin in REPLICATE_DISKS.
             # This should no longer be a problem when worker pooling lands.
-            # Alternatively, REPLICATE_DISKS could be modfied to re-use the
-            # resources deployed during 'DEPLOY_SOURCE_RESOURCES'.
-            # These are currently not even passed to REPLICATE_DISKS (just
-            # their connection info), and should be fixed later.
             last_migration_task = None
             migration_resources_tasks = [
                 deploy_migration_source_resources_task.id,
@@ -1147,6 +1143,7 @@ class ConductorServerEndpoint(object):
             deploy_instance_task = self._create_task(
                 instance, constants.TASK_TYPE_DEPLOY_INSTANCE_RESOURCES,
                 execution, depends_on=[
+                    last_migration_task.id,
                     delete_destination_resources_task.id])
 
             depends_on = [deploy_instance_task.id]
@@ -1920,8 +1917,9 @@ class ConductorServerEndpoint(object):
                 "Received confirmation that presumably cancelling task '%s' "
                 "(status '%s') has just completed successfully. "
                 "This should have never happened and indicates that its worker "
-                "host ('%s') has failed to cancel it properly. Please "
-                "check the worker logs for more details. "
+                "host ('%s') has either failed to cancel it properly, or it "
+                "was completed before the cancellation request was received. "
+                "Please check the worker logs for more details. "
                 "Marking as %s and processing its result as if it completed "
                 "successfully.",
                 task.id, task.status, task.host,
@@ -1929,10 +1927,11 @@ class ConductorServerEndpoint(object):
             db_api.set_task_status(
                 ctxt, task_id, constants.TASK_STATUS_CANCELED_AFTER_COMPLETION,
                 exception_details=(
-                    "The worker host for this task ('%s') has failed at "
-                    "cancelling it so this task was unintentionally run to "
+                    "The worker host for this task ('%s') has either failed "
+                    "at cancelling it or the cancellation request arrived "
+                    "after it was already completed so this task was run to "
                     "completion. Please review the worker logs for "
-                    "more relevant details and contact support." % (
+                    "more relevant details." % (
                         task.host)))
         elif task.status in constants.FINALIZED_TASK_STATUSES:
             LOG.error(

+ 31 - 8
coriolis/providers/backup_writers.py

@@ -93,6 +93,24 @@ def _disable_lvm2_lvmetad(ssh):
             ssh, "sudo vgchange -an", get_pty=True)
 
 
+def _check_deserialize_key(key):
+    res = None
+    if isinstance(key, paramiko.RSAKey):
+        LOG.trace("Key is already in the proper format.")
+        res = key
+    elif type(key) is str:
+        LOG.trace("Deserializing PEM-encoded private key.")
+        res = utils.deserialize_key(
+            key, CONF.serialization.temp_keypair_password)
+    else:
+        raise exception.CoriolisException(
+            "Private key must be either a PEM-encoded string or "
+            "a paramiko.RSAKey instance. Got type '%s'." % (
+                type(key)))
+
+    return res
+
+
 class BackupWritersFactory(object):
 
     def __init__(self, writer_connection_info, volumes_info):
@@ -431,11 +449,7 @@ class SSHBackupWriter(BaseBackupWriter):
                 "Either pkey or password are required")
 
         if pkey:
-            if type(pkey) is not str:
-                raise exception.CoriolisException(
-                    "pkey must be a PEM encoded RSA private key")
-            pkey = utils.deserialize_key(
-                pkey, CONF.serialization.temp_keypair_password)
+            pkey = _check_deserialize_key(pkey)
 
         return cls(ip, port, username, pkey, password, volumes_info)
 
@@ -729,8 +743,7 @@ class HTTPBackupWriterBoostrapper(object):
             raise exception.CoriolisException(
                 "Either password or pkey are required")
         if self._pkey:
-            self._pkey = utils.deserialize_key(
-                self._pkey, CONF.serialization.temp_keypair_password)
+            self._pkey = _check_deserialize_key(self._pkey)
         self._ssh = self._connect_ssh()
 
     @utils.retry_on_error(sleep_seconds=30)
@@ -915,12 +928,22 @@ class HTTPBackupWriter(BaseBackupWriter):
         """
         ip = conn_info.get("ip")
         port = conn_info.get("port")
-        certs = conn_info.get("certificates")
+        certs = conn_info.get("certificates", {})
 
         required = ["ip", "port", "certificates"]
         if not all([ip, port, certs]):
             raise exception.CoriolisException(
                 "Missing required connection info: %s" % ", ".join(required))
+
+        required_cert_options = ["client_crt", "client_key", "ca_crt"]
+        missing_cert_options = [
+            opt for opt in required_cert_options
+            if opt not in certs]
+        if missing_cert_options:
+            raise exception.CoriolisException(
+                "Missing the following HTTPBackupWriter fields from the "
+                "'certificates' options: %s" % missing_cert_options)
+
         return cls(ip, port, volumes_info, certs)
 
     def __del__(self):

+ 9 - 5
coriolis/providers/replicator.py

@@ -459,13 +459,17 @@ class Replicator(object):
         port = conn_info.get('port', 22)
         password = conn_info.get('password', None)
         pkey = conn_info.get('pkey', None)
-        for i in required:
-            if conn_info.get(i) is None:
-                raise exception.CoriolisException(
-                    "missing required field: %s" % i)
+        missing = [
+            field for field in required
+            if field not in conn_info]
+        if missing:
+            raise exception.CoriolisException(
+                "Missing some required fields from source replication "
+                "worker VM connection info: %s" % missing)
         if any([password, pkey]) is False:
             raise exception.CoriolisException(
-                "Either password or pkey is required")
+                "Either 'password' or 'pkey' for source worker VM is required "
+                "to initialize the Coriolis replicator.")
 
         if pkey:
             if type(pkey) is str:

+ 5 - 0
coriolis/schemas.py

@@ -38,6 +38,8 @@ _CORIOLIS_DISK_SYNC_RESOURCES_INFO_SCHEMA_NAME = (
     "disk_sync_resources_info_schema.json")
 _CORIOLIS_DISK_SYNC_RESOURCES_CONN_INFO_SCHEMA_NAME = (
     "disk_sync_resources_conn_info_schema.json")
+_CORIOLIS_REPLICATION_WORKER_CONN_INFO_SCHEMA_NAME = (
+    "replication_worker_conn_info_schema.json")
 
 
 def get_schema(package_name, schema_name,
@@ -128,3 +130,6 @@ CORIOLIS_DISK_SYNC_RESOURCES_INFO_SCHEMA = get_schema(
 
 CORIOLIS_DISK_SYNC_RESOURCES_CONN_INFO_SCHEMA = get_schema(
     __name__, _CORIOLIS_DISK_SYNC_RESOURCES_CONN_INFO_SCHEMA_NAME)
+
+CORIOLIS_REPLICATION_WORKER_CONN_INFO_SCHEMA = get_schema(
+    __name__, _CORIOLIS_REPLICATION_WORKER_CONN_INFO_SCHEMA_NAME)

+ 40 - 21
coriolis/schemas/disk_sync_resources_conn_info_schema.json

@@ -1,31 +1,50 @@
 {
   "$schema": "http://cloudbase.it/coriolis/schemas/disk_sync_resources_conn_info#",
   "type": "object",
-  "description": "Object defining SSH connection parameters for a temporary minion VM which will carry out disk syncing and/or Linux OSMorphing",
+  "description": "Object defining SSH connection parameters for a temporary minion VM which will carry out disk syncing.",
   "properties": {
-    "ip": {
-      "type": "string"
+    "backend": {
+      "enum": ["ssh_backup_writer", "http_backup_writer", "file_backup_writer"]
     },
-    "port": {
-      "type": "integer"
-    },
-    "username": {
-      "type": "string"
-    },
-    "password": {
-      "$ref": "#/definitions/nullableString"
-    },
-    "pkey": {
-      "$ref": "#/definitions/nullableString"
-    },
-    "cert_pem": {
-      "type": "string"
-    },
-    "cert_key_pem": {
-      "type": "string"
+    "connection_details": {
+      "ip": {
+        "type": "string"
+      },
+      "port": {
+        "type": "integer"
+      },
+      "username": {
+        "type": "string"
+      },
+      "password": {
+        "$ref": "#/definitions/nullableString"
+      },
+      "pkey": {
+        "$ref": "#/definitions/nullableString"
+      },
+      "certificates": {
+        "type": "object",
+        "description": "Certificates for the HTTP-based backup writer.",
+        "properties": {
+          "client_crt": {
+            "type": "string",
+            "description": "PEM-encoded client certificate."
+          },
+          "client_key": {
+            "type": "string",
+            "description": "PEM-encoded client private key."
+          },
+          "ca_crt": {
+            "type": "string",
+            "description": "PEM-encoded vertificate of the authority used to validate the server."
+          }
+        },
+        "required": ["client_crt", "client_key", "ca_crt"]
+      },
+      "required": ["ip", "port"]
     }
   },
-  "required": ["ip", "port", "username"],
+  "required": ["backend", "connection_details"],
   "definitions": {
     "nullableString": {
       "oneOf": [{

+ 32 - 0
coriolis/schemas/replication_worker_conn_info_schema.json

@@ -0,0 +1,32 @@
+{
+  "$schema": "http://cloudbase.it/coriolis/schemas/replication_worker_conn_info#",
+  "type": "object",
+  "description": "Object defining SSH connection parameters for the source-side temporary minion VM which will carry out disk exports using Coriolis' in-built data replication engine.",
+  "properties": {
+    "ip": {
+      "type": "string"
+    },
+    "port": {
+      "type": "integer"
+    },
+    "username": {
+      "type": "string"
+    },
+    "password": {
+      "$ref": "#/definitions/nullableString"
+    },
+    "pkey": {
+      "$ref": "#/definitions/nullableString"
+    }
+  },
+  "required": ["ip", "port", "username"],
+  "definitions": {
+    "nullableString": {
+      "oneOf": [{
+        "type": "string"
+      }, {
+        "type": "null"
+      }]
+    }
+  }
+}

+ 16 - 10
coriolis/tasks/base.py

@@ -129,21 +129,27 @@ def get_connection_info(ctxt, data):
     return utils.get_secret_connection_info(ctxt, connection_info)
 
 
-def marshal_migr_conn_info(migr_connection_info):
-    if migr_connection_info and "pkey" in migr_connection_info:
+def marshal_migr_conn_info(
+        migr_connection_info, private_key_field_name="pkey"):
+    if migr_connection_info and (
+            private_key_field_name in migr_connection_info):
         migr_connection_info = migr_connection_info.copy()
-        pkey = migr_connection_info["pkey"]
+        pkey = migr_connection_info[private_key_field_name]
         if isinstance(pkey, str) is False:
-            migr_connection_info["pkey"] = utils.serialize_key(
-                pkey, CONF.serialization.temp_keypair_password)
+            migr_connection_info[private_key_field_name] = (
+                utils.serialize_key(
+                    pkey, CONF.serialization.temp_keypair_password))
     return migr_connection_info
 
 
-def unmarshal_migr_conn_info(migr_connection_info):
-    if migr_connection_info and "pkey" in migr_connection_info:
+def unmarshal_migr_conn_info(
+        migr_connection_info, private_key_field_name="pkey"):
+    if migr_connection_info and (
+            private_key_field_name in migr_connection_info):
         migr_connection_info = migr_connection_info.copy()
-        pkey_str = migr_connection_info["pkey"]
+        pkey_str = migr_connection_info[private_key_field_name]
         if isinstance(pkey_str, paramiko.rsakey.RSAKey) is False:
-            migr_connection_info["pkey"] = utils.deserialize_key(
-                pkey_str, CONF.serialization.temp_keypair_password)
+            migr_connection_info[private_key_field_name] = (
+                utils.deserialize_key(
+                    pkey_str, CONF.serialization.temp_keypair_password))
     return migr_connection_info

+ 2 - 0
coriolis/tasks/osmorphing_tasks.py

@@ -105,6 +105,8 @@ class DeployOSMorphingResourcesTask(base.TaskRunner):
                 "Target provider '%s' did NOT return any "
                 "'osmorphing_connection_info'." % (
                     destination["type"]))
+        osmorphing_connection_info = base.marshal_migr_conn_info(
+            osmorphing_connection_info)
 
         os_morphing_info = import_info.get("osmorphing_info", {})
         if not os_morphing_info:

+ 26 - 6
coriolis/tasks/replica_tasks.py

@@ -148,11 +148,18 @@ class ReplicateDisksTask(base.TaskRunner):
         if migr_source_conn_info:
             schemas.validate_value(
                 migr_source_conn_info,
-                schemas.CORIOLIS_DISK_SYNC_RESOURCES_CONN_INFO_SCHEMA)
-        migr_source_conn_info = base.unmarshal_migr_conn_info(
-            migr_source_conn_info)
+                schemas.CORIOLIS_REPLICATION_WORKER_CONN_INFO_SCHEMA)
+            migr_source_conn_info = base.unmarshal_migr_conn_info(
+                migr_source_conn_info)
 
         migr_target_conn_info = task_info["target_resources_connection_info"]
+        if migr_target_conn_info:
+            schemas.validate_value(
+                migr_target_conn_info,
+                schemas.CORIOLIS_DISK_SYNC_RESOURCES_CONN_INFO_SCHEMA)
+            migr_target_conn_info['connection_details'] = (
+                base.unmarshal_migr_conn_info(
+                    migr_target_conn_info['connection_details']))
         incremental = task_info.get("incremental", True)
 
         source_environment = task_info['source_environment']
@@ -282,7 +289,7 @@ class DeployReplicaSourceResourcesTask(base.TaskRunner):
                     migr_connection_info)
                 schemas.validate_value(
                     migr_connection_info,
-                    schemas.CORIOLIS_DISK_SYNC_RESOURCES_CONN_INFO_SCHEMA,
+                    schemas.CORIOLIS_REPLICATION_WORKER_CONN_INFO_SCHEMA,
                     # NOTE: we avoid raising so that the cleanup task
                     # can [try] to deal with the temporary resources.
                     raise_on_error=False)
@@ -385,8 +392,21 @@ class DeployReplicaTargetResourcesTask(base.TaskRunner):
                     migr_connection_info, None).get_writer()
             except Exception as err:
                 LOG.warn(
-                    "Seemingly invalid connection info. Replica will likely "
-                    "fail during disk Replication. Error is: %s" % str(err))
+                    "Seemingly invalid backup writer conn info. Replica will "
+                    "likely fail during disk Replication. Error is: %s" % (
+                        str(err)))
+
+            if migr_connection_info:
+                if 'connection_details' in migr_connection_info:
+                    migr_connection_info['connection_details'] = (
+                        base.marshal_migr_conn_info(
+                            migr_connection_info['connection_details']))
+                schemas.validate_value(
+                    migr_connection_info,
+                    schemas.CORIOLIS_DISK_SYNC_RESOURCES_CONN_INFO_SCHEMA,
+                    # NOTE: we avoid raising so that the cleanup task
+                    # can [try] to deal with the temporary resources.
+                    raise_on_error=False)
         else:
             LOG.warn(
                 "Replica target provider for '%s' did NOT return any "

+ 1 - 1
coriolis/worker/rpc/server.py

@@ -220,7 +220,7 @@ class WorkerServerEndpoint(object):
 
             LOG.info(
                 "Output of completed %s task with ID %s: %s",
-                task_type, task_type,
+                task_type, task_id,
                 utils.sanitize_task_info(task_result))
 
             self._rpc_conductor_client.task_completed(