Просмотр исходного кода

Added backup writer factory

  * add backup writer factory
  * require that the pkey returned by the providers be a PEM
    encoded RSA private key.
Gabriel-Adrian Samfira 6 лет назад
Родитель
Сommit
8cba2eaa12

+ 201 - 15
coriolis/providers/backup_writers.py

@@ -10,6 +10,7 @@ import tempfile
 import threading
 import time
 import uuid
+import shutil
 
 import eventlet
 from oslo_config import cfg
@@ -34,6 +35,15 @@ CONF.register_opts(opts)
 _CORIOLIS_HTTP_WRITER_CMD = "coriolis-writer"
 
 LOG = logging.getLogger(__name__)
+BACKUP_WRITER_SSH = "ssh_backup_writer"
+BACKUP_WRITER_HTTP = "http_backup_writer"
+BACKUP_WRITER_FILE = "file_backup_writer"
+
+BACKUP_WRITERS = [
+    BACKUP_WRITER_SSH,
+    BACKUP_WRITER_HTTP,
+    BACKUP_WRITER_FILE
+]
 
 _WRITER_ERR_MAP = {
     -1: "ERR_MORE_MSG",
@@ -83,6 +93,44 @@ def _disable_lvm2_lvmetad(ssh):
             ssh, "sudo vgchange -an", get_pty=True)
 
 
+class BackupWritersFactory(object):
+
+    def __init__(self, writer_connection_info, volumes_info):
+        self._validate_info(writer_connection_info)
+        self._type = writer_connection_info["backend"]
+        self._conn_info = writer_connection_info["connection_details"]
+        self._volumes_info = volumes_info
+
+    def get_writer(self):
+        if self._type == BACKUP_WRITER_SSH:
+            return SSHBackupWriter.from_connection_info(
+                self._conn_info, self._volumes_info)
+        elif self._type == BACKUP_WRITER_HTTP:
+            return HTTPBackupWriter.from_connection_info(
+                self._conn_info, self._volumes_info)
+        elif self._type == BACKUP_WRITER_FILE:
+            return FileBackupWriter.from_connection_info(
+                self._conn_info, self._volumes_info)
+        raise exception.CoriolisException(
+            "Invalid backup writer type: %s" % self._type)
+
+    def _validate_info(self, info):
+        if type(info) is not dict:
+            raise exception.CoriolisException(
+                "Invalid backup writer connection info.")
+        wrt_type = info.get("backend", None)
+        if wrt_type is None:
+            raise exception.CoriolisException(
+                "Missing backend name in connection info")
+        if wrt_type not in BACKUP_WRITERS:
+            raise exception.CoriolisException(
+                "Invalid backup writer type: %s" % wrt_type)
+        wrt_conn_info = info.get("connection_details")
+        if wrt_conn_info is None:
+            raise exception.CoriolisException(
+                "Missing credentials in connection info")
+
+
 class BaseBackupWriterImpl(with_metaclass(abc.ABCMeta)):
     def __init__(self, path, disk_id):
         self._path = path
@@ -132,6 +180,11 @@ class BaseBackupWriter(with_metaclass(abc.ABCMeta)):
             if impl:
                 impl.close()
 
+    @classmethod
+    @abc.abstractmethod
+    def from_connection_info(cls, info, volumes_info):
+        pass
+
 
 class FileBackupWriterImpl(BaseBackupWriterImpl):
     def __init__(self, path, disk_id):
@@ -161,6 +214,10 @@ class FileBackupWriter(BaseBackupWriter):
     def _get_impl(self, path, disk_id):
         return FileBackupWriterImpl(path, disk_id)
 
+    @classmethod
+    def from_connection_info(cls, info, volumes_info):
+        return cls()
+
 
 class SSHBackupWriterImpl(BaseBackupWriterImpl):
     def __init__(self, path, disk_id, compress_transfer=None,
@@ -356,6 +413,32 @@ class SSHBackupWriter(BaseBackupWriter):
         self._ssh = None
         self._lock = threading.Lock()
 
+    @classmethod
+    def from_connection_info(cls, info, volumes_info):
+        required = ["ip", "port", "username"]
+        ip = info.get("ip")
+        port = info.get("port")
+        username = info.get("username")
+        pkey = info.get("pkey")
+        password = info.get("password")
+
+        if not all([ip, port, username]):
+            raise exception.CoriolisException(
+                "Connection info is invalid for SSHBackupWriter. "
+                "The following fields are required: %s" % ", ".join(required))
+        if pkey is None and password is None:
+            raise exception.CoriolisException(
+                "Either pkey or password are required")
+
+        if pkey:
+            if type(pkey) is not str:
+                raise exception.CoriolisException(
+                    "pkey must be a PEM encoded RSA private key")
+            pkey = utils.deserialize_key(
+                pkey, CONF.serialization.temp_keypair_password)
+
+        return cls(ip, port, username, pkey, password, volumes_info)
+
     def _get_impl(self, path, disk_id):
         ssh = self._connect_ssh()
         _disable_lvm2_lvmetad(ssh)
@@ -439,6 +522,8 @@ class HTTPBackupWriterImpl(BaseBackupWriterImpl):
         self._compressor_evt = None
 
         self._compress_transfer = compress_transfer
+        if self._compress_transfer is None:
+            self._compress_transfer = CONF.compress_transfers
         super(HTTPBackupWriterImpl, self).__init__(path, disk_id)
 
     def _set_info(self, info):
@@ -540,7 +625,6 @@ class HTTPBackupWriterImpl(BaseBackupWriterImpl):
     def _sender(self):
         while True:
             payload = self._sender_q.get()
-            LOG.debug("Got send payload")
             headers = {
                 "X-Write-Offset": str(payload["offset"]),
                 "X-Client-Token": self._id,
@@ -628,7 +712,7 @@ class HTTPBackupWriterImpl(BaseBackupWriterImpl):
 class HTTPBackupWriter(BaseBackupWriter):
 
     def __init__(self, ip, port, volumes_info, ssh=None,
-                 compressor_count=3, writer_creds=None):
+                 compressor_count=3, certificates=None):
         self._ip = ip
         self._port = port
         self._volumes_info = volumes_info
@@ -639,14 +723,112 @@ class HTTPBackupWriter(BaseBackupWriter):
         self._writer_cmd = os.path.join(
             "/usr/bin", _CORIOLIS_HTTP_WRITER_CMD)
         self._ssh = ssh
-        self._writer_creds = writer_creds
+        self._certificates = certificates
         self._crt_dir = tempfile.mkdtemp()
-        if not self._ssh and not self._writer_creds:
+        if not self._ssh and not self._certificates:
             raise exception.CoriolisException(
                 "Either ssh or writer_creds must be specified")
-        if self._ssh and self._writer_creds:
+        if self._ssh and self._certificates:
             raise exception.CoriolisException(
                 "ssh and writer_creds are mutually exclusive")
+        self._cert_paths = None
+
+    @classmethod
+    def from_connection_info(cls, conn_info, volumes_info):
+        """Instantiate a HTTP backup writer from connection info.
+
+        Connection info has the following schema:
+
+        {
+            # IP address or hostname where we can reach the backup writer
+            "ip": "192.168.0.1",
+            # Backup writer port
+            "port": 4433,
+
+            # ssh_conn_info is only needed in environments where the HTTP
+            # backup writer cannot be set up via userdata. This can be
+            # used to copy the coriolis-writer binary, create certificates
+            # and initialize everything. This option and the "certificates"
+            # option are mutually exclusive. Set to None if the writer has
+            # already been set up via userdata, or other means.
+            "ssh_conn_info": {
+                # The IP here is usually the same as the IP/hostname of the
+                # backup writer, shown above.
+                "ip": "192.168.0.1",
+                # SSH port. Defaults to 22.
+                "port": 22,
+                "username": "example",
+                # Password is only needed if no pkey is specified
+                "password": "super secret password",
+                # pkey can be omited if password is used. Pkey must be a
+                # PEM formatted OpenSSH private key, paramiko.RSA private key
+                # or None
+                "pkey": "RSA_PRIVATE_KEY"
+            },
+
+            # The "certificates" field and the "ssh_conn_info" are mutually
+            # exclusive. Either the writer is set up via userdata, or we set
+            # it up ourselves using a SSH connection. But whether or not it's
+            # set up must be unambiguous. Setting this option means we already
+            # have the certificates, and the coriolis-writer binary is already
+            # set up and started
+            "certificates": {
+                # PEM encoded client certificate
+                "client_crt": "",
+                # PEM encoded client private key
+                "client_key": "",
+                # PEM encoded CA certificate we use to validate the server
+                "ca_crt": ""
+            }
+        }
+        """
+        ip = conn_info.get("ip")
+        port = conn_info.get("port")
+
+        required = ["ip", "port"]
+        if not all([ip, port]):
+            raise exception.CoriolisException(
+                "Missing required connection info: %s" % ", ".join(required))
+
+        ssh_cli = None
+        ssh = conn_info.get("ssh_conn_info")
+        certs = conn_info.get("certificates")
+        if ssh:
+            if type(ssh) is not dict:
+                raise exception.CoriolisException(
+                    "ssh connection info is invalid")
+            ssh_ip = ssh.get("ip")
+            ssh_port = ssh.get("port", 22)
+            ssh_username = ssh.get("username")
+            if not all([ssh_ip, ssh_port, ssh_username]):
+                raise exception.CoriolisException(
+                    "Missing required fields in SSH conn info. "
+                    "Required fields are: %s" % ", ".join(
+                        ["ip", "port", "username"]))
+            password = ssh.get("password")
+            pkey = ssh.get("pkey")
+            if pkey is not None:
+                if type(pkey) is not str:
+                    raise exception.CoriolisException(
+                        "pkey must be a PEM encoded RSA private key")
+                pkey = utils.deserialize_key(
+                    pkey, CONF.serialization.temp_keypair_password)
+            ssh_cli = paramiko.SSHClient()
+            ssh_cli.set_missing_host_key_policy(paramiko.AutoAddPolicy())
+            utils.retry_on_error(sleep_seconds=10)(ssh_cli.connect)(
+                hostname=ssh_ip,
+                port=ssh_port,
+                username=ssh_username,
+                pkey=pkey,
+                password=password)
+        return cls(ip, port, volumes_info, ssh=ssh_cli, certificates=certs)
+
+    def __del__(self):
+        if self._crt_dir and os.path.isdir(self._crt_dir):
+            try:
+                shutil.rmtree(self._crt_dir)
+            except BaseException:
+                pass
 
     def _wait_for_conn(self):
         LOG.debug(
@@ -662,23 +844,27 @@ class HTTPBackupWriter(BaseBackupWriter):
             "-j ACCEPT" % self._writer_port, get_pty=True)
 
     def _write_cert_files(self):
-        if not self._writer_creds:
+        if not self._certificates:
             raise exception.CoriolisException(
-                "writer_creds not set")
-        crt_file = tempfile.mkstemp()[1]
-        key_file = tempfile.mkstemp()[1]
-        ca_crt_file = tempfile.mkstemp()[1]
+                "certificates not set")
+        if self._cert_paths:
+            return self._cert_paths
+
+        crt_file = tempfile.mkstemp(dir=self._crt_dir)[1]
+        key_file = tempfile.mkstemp(dir=self._crt_dir)[1]
+        ca_crt_file = tempfile.mkstemp(dir=self._crt_dir)[1]
         with open(crt_file, "w") as fd:
-            fd.write(self._writer_creds["client_crt"])
+            fd.write(self._certificates["client_crt"])
         with open(key_file, "w") as fd:
-            fd.write(self._writer_creds["client_key"])
+            fd.write(self._certificates["client_key"])
         with open(ca_crt_file, "w") as fd:
-            fd.write(self._writer_creds["ca_crt"])
-        return {
+            fd.write(self._certificates["ca_crt"])
+        self._cert_paths = {
             "client_crt": crt_file,
             "client_key": key_file,
             "ca_crt": ca_crt_file,
         }
+        return self._cert_paths
 
     def _get_impl(self, path, disk_id):
         if self._ssh:
@@ -686,6 +872,7 @@ class HTTPBackupWriter(BaseBackupWriter):
             cert_paths = self._setup_writer(self._ssh)["local"]
         else:
             cert_paths = self._write_cert_files()
+        self._wait_for_conn()
 
         path = [v for v in self._volumes_info
                 if v["disk_id"] == disk_id][0]["volume_dev"]
@@ -817,7 +1004,6 @@ class HTTPBackupWriter(BaseBackupWriter):
         utils.create_service(
             ssh, cmdline, _CORIOLIS_HTTP_WRITER_CMD, start=True)
         self._inject_iptables_allow(ssh)
-        self._wait_for_conn()
 
     def _setup_writer(self, ssh):
         self._copy_writer(ssh)

+ 1 - 3
coriolis/providers/factory.py

@@ -1,8 +1,7 @@
-# Copyright 2016 Cloudbase :Solutions Srl
+# Copyright 2016 Cloudbase Solutions Srl
 # All Rights Reserved.
 
 from oslo_config import cfg
-from oslo_log import log as logging
 
 from coriolis import constants
 from coriolis import exception
@@ -15,7 +14,6 @@ serialization_opts = [
                 help='List of provider class paths'),
 ]
 
-LOG = logging.getLogger(__name__)
 CONF = cfg.CONF
 CONF.register_opts(serialization_opts)
 

+ 0 - 1
coriolis/secrets.py

@@ -10,7 +10,6 @@ from coriolis import utils
 
 
 def get_secret(ctxt, secret_ref):
-    keystone.create_trust(ctxt)
     session = keystone.create_keystone_session(ctxt)
     barbican = barbican_client.Client(session=session)
     sec = utils.retry_on_error()(barbican.secrets.get)(secret_ref)

+ 17 - 24
coriolis/tasks/replica_tasks.py

@@ -7,6 +7,7 @@ from coriolis import constants
 from coriolis import events
 from coriolis import exception
 from coriolis.providers import factory as providers_factory
+from coriolis.providers import backup_writers
 from coriolis import schemas
 from coriolis.tasks import base
 from coriolis import utils
@@ -112,20 +113,14 @@ class ReplicateDisksTask(base.TaskRunner):
             schemas.CORIOLIS_DISK_SYNC_RESOURCES_INFO_SCHEMA)
 
         migr_source_conn_info = task_info["migr_source_connection_info"]
-        #if migr_source_conn_info:
-        #    schemas.validate_value(
-        #        migr_source_conn_info,
-        #        schemas.CORIOLIS_DISK_SYNC_RESOURCES_CONN_INFO_SCHEMA)
+        if migr_source_conn_info:
+            schemas.validate_value(
+                migr_source_conn_info,
+                schemas.CORIOLIS_DISK_SYNC_RESOURCES_CONN_INFO_SCHEMA)
         migr_source_conn_info = base.unmarshal_migr_conn_info(
             migr_source_conn_info)
 
         migr_target_conn_info = task_info["migr_target_connection_info"]
-        #schemas.validate_value(
-        #    migr_target_conn_info,
-        #    schemas.CORIOLIS_DISK_SYNC_RESOURCES_CONN_INFO_SCHEMA)
-        migr_target_conn_info = base.unmarshal_migr_conn_info(
-            migr_target_conn_info)
-
         incremental = task_info.get("incremental", True)
 
         source_environment = origin.get('source_environment') or {}
@@ -220,12 +215,12 @@ class DeployReplicaSourceResourcesTask(base.TaskRunner):
         if migr_connection_info:
             migr_connection_info = base.marshal_migr_conn_info(
                 migr_connection_info)
-            #schemas.validate_value(
-            #    migr_connection_info,
-            #    schemas.CORIOLIS_DISK_SYNC_RESOURCES_CONN_INFO_SCHEMA,
-            #    # NOTE: we avoid raising so that the cleanup task
-            #    # can [try] to deal with the temporary resources.
-            #    raise_on_error=False)
+            schemas.validate_value(
+                migr_connection_info,
+                schemas.CORIOLIS_DISK_SYNC_RESOURCES_CONN_INFO_SCHEMA,
+                # NOTE: we avoid raising so that the cleanup task
+                # can [try] to deal with the temporary resources.
+                raise_on_error=False)
 
         task_info["migr_source_connection_info"] = migr_connection_info
 
@@ -284,14 +279,12 @@ class DeployReplicaTargetResourcesTask(base.TaskRunner):
             "migr_resources"]
 
         migr_connection_info = replica_resources_info["connection_info"]
-        migr_connection_info = base.marshal_migr_conn_info(
-            migr_connection_info)
-        #schemas.validate_value(
-        #    migr_connection_info,
-        #    schemas.CORIOLIS_DISK_SYNC_RESOURCES_CONN_INFO_SCHEMA,
-        #    # NOTE: we avoid raising so that the cleanup task
-        #    # can [try] to deal with the temporary resources.
-        #    raise_on_error=False)
+        try:
+            backup_writers.BackupWritersFactory(
+                migr_connection_info, None).get_writer()
+        except BaseException as err:
+            LOG.exception(
+                "Invalid connection info: %s" % err)
 
         task_info["migr_target_connection_info"] = migr_connection_info