Просмотр исходного кода

Add HTTP backup writer

Leverage a HTTP based backup writer. The HTTP backup writer has the
added benefit of requiring less resources for large chunks, as it
does not need to rely on struct.pack to generate the payload it
sends over the wire. Standard HTTP calls are made to a remote API.

It also does not require a live, SSH channel to be active, and
recovery from failure is more likely to succeed. Removing the reliance
on paramiko also means that we are no longer constrained by the fact
that encryption of data over SSH done by paramiko is single threaded
and limited by the power of a single core available to the coriolis
worker.
Gabriel Adrian Samfira 6 лет назад
Родитель
Сommit
45bbd96ee5
2 измененных файлов с 413 добавлено и 0 удалено
  1. 413 0
      coriolis/providers/backup_writers.py
  2. BIN
      coriolis/resources/coriolis-writer

+ 413 - 0
coriolis/providers/backup_writers.py

@@ -5,13 +5,19 @@ import abc
 import contextlib
 import contextlib
 import errno
 import errno
 import os
 import os
+import tempfile
+import time
 import threading
 import threading
+import uuid
 
 
+import eventlet
 from oslo_config import cfg
 from oslo_config import cfg
 from oslo_log import log as logging
 from oslo_log import log as logging
 import paramiko
 import paramiko
+import requests
 from six import with_metaclass
 from six import with_metaclass
 
 
+from coriolis import constants
 from coriolis import data_transfer
 from coriolis import data_transfer
 from coriolis import exception
 from coriolis import exception
 from coriolis import utils
 from coriolis import utils
@@ -24,6 +30,7 @@ opts = [
                 help='Use compression if possible during disk transfers'),
                 help='Use compression if possible during disk transfers'),
 ]
 ]
 CONF.register_opts(opts)
 CONF.register_opts(opts)
+_CORIOLIS_HTTP_WRITER_CMD = "coriolis-writer"
 
 
 
 
 class BaseBackupWriterImpl(with_metaclass(abc.ABCMeta)):
 class BaseBackupWriterImpl(with_metaclass(abc.ABCMeta)):
@@ -250,3 +257,409 @@ class SSHBackupWriter(BaseBackupWriter):
             pkey=self._pkey,
             pkey=self._pkey,
             password=self._password)
             password=self._password)
         return ssh
         return ssh
+
+
+class HTTPBackupWriterImpl(BaseBackupWriterImpl):
+    def __init__(self, path, disk_id, compress_transfer=None):
+        self._offset = None
+        self._session = None
+        self._ip = None
+        self._port = None
+        self._crt = None
+        self._key = None
+        self._ca = None
+        self._closing = False
+        self._write_error = False
+        self._id = None
+        self._exception = None
+        self._profile = [{
+            "compress_time": None,
+            "write_time": None,
+            "transferred_compressed": None,
+            "transferred_inflated": None,
+        }]
+        self._comp_q = eventlet.Queue(maxsize=5)
+        self._sender_q = eventlet.Queue(maxsize=5)
+
+        self._sender_evt = None
+        self._compressor_evt = None
+
+        self._compress_transfer = compress_transfer
+        if self._compress_transfer is None:
+            self._compress_transfer = CONF.compress_transfers
+        super(HTTPBackupWriterImpl, self).__init__(path, disk_id)
+
+    def _set_info(self, info):
+        self._ip = info.get("ip")
+        self._port = info.get("port")
+        self._crt = info.get("client_crt")
+        self._key = info.get("client_key")
+        self._ca = info.get("ca_crt")
+        self._id = info.get("id")
+        if not all([self._ip, self._port, self._crt,
+                    self._key, self._ca, self._id]):
+            raise exception.CoriolisException(
+                "Missing required info when creating HTTPBackupWriter")
+
+    @property
+    def _uri(self):
+        return "https://%s:%s/api/v1%s" % (
+            self._ip, self._port, self._path
+        )
+
+    @utils.retry_on_error()
+    def _acquire(self):
+        self._ensure_session()
+        uri = "%s/acquire" % self._uri
+        headers = {"X-Client-Token": self._id}
+        resp = self._session.get(uri, headers=headers)
+        LOG.debug("Returned code: %d. Msg: %s" % (
+            resp.status_code, resp.content))
+        resp.raise_for_status()
+
+    @utils.retry_on_error()
+    def _release(self):
+        self._ensure_session()
+        uri = "%s/release" % self._uri
+        headers = {"X-Client-Token": self._id}
+        resp = self._session.get(uri, headers=headers)
+        LOG.debug("Returned code: %d. Msg: %s" %
+                  (resp.status_code, resp.content))
+        resp.raise_for_status()
+
+    def _init_session(self):
+        if self._session:
+            self._session.close()
+        sess = requests.Session()
+        sess.cert = (
+            self._crt,
+            self._key)
+        sess.verify = self._ca
+        self._session = sess
+
+    def _open(self):
+        self._closing = False
+        self._init_session()
+        self._acquire()
+        self._sender_evt = eventlet.spawn(self._sender)
+        self._compressor_evt = [
+            eventlet.spawn(self._compressor),
+            eventlet.spawn(self._compressor),
+            eventlet.spawn(self._compressor)]
+
+    def seek(self, pos):
+        self._offset = pos
+
+    def truncate(self, size):
+        pass
+
+    def _ensure_session(self):
+        if not self._session:
+            self._init_session()
+            return
+        if self._write_error:
+            self._init_session()
+            return
+
+    def _compressor(self):
+        while True:
+            payload = self._comp_q.get()
+            send_payload = {
+                "encoding": None,
+                "offset": payload["offset"],
+            }
+            chunk = payload["data"]
+            if self._compress_transfer:
+                try:
+                    chunk, compressed = data_transfer.compression_proxy(
+                        chunk, constants.COMPRESSION_FORMAT_GZIP)
+                    if compressed:
+                        send_payload["encoding"] = 'gzip'
+                except Exception as err:
+                    LOG.exception(err)
+                    self._exception = err
+                    raise
+            send_payload["chunk"] = chunk
+            self._sender_q.put(send_payload)
+            self._comp_q.task_done()
+
+    def _sender(self):
+        while True:
+            payload = self._sender_q.get()
+            headers = {
+                "X-Write-Offset": str(payload["offset"]),
+                "X-Client-Token": self._id,
+            }
+            if payload.get("encoding", None):
+                headers["content-encoding"] = payload["encoding"]
+
+            @utils.retry_on_error()
+            def send():
+                self._ensure_session()
+                resp = self._session.post(
+                    self._uri, headers=headers, data=payload["chunk"]
+                )
+                LOG.debug(
+                    "Response code: %r, content: %r" %
+                    (resp.status_code, resp.content))
+                try:
+                    resp.raise_for_status()
+                    self._write_error = False
+                except Exception as err:
+                    LOG.warning(
+                        "Error writing chunk to disk %s at offset"
+                        " %s: %s" % (self._path, payload["offset"], err))
+                    self._write_error = True
+                    raise
+            try:
+                send()
+            except Exception as err:
+                # record the exception. We need to terminate
+                # the writer if this is set
+                LOG.exception(err)
+                self._exception = err
+                raise
+            self._sender_q.task_done()
+
+    @utils.retry_on_error()
+    def write(self, data):
+        if self._closing:
+            raise exception.CoriolisException(
+                "Attempted to write to a closed writer."
+            )
+        if self._exception:
+            raise exception.CoriolisException(self._exception)
+
+        payload = {
+            "offset": self._offset,
+            "data": data,
+        }
+        self._comp_q.put(payload)
+        self._offset += len(data)
+
+    def _wait_for_queues(self):
+        while (self._comp_q.unfinished_tasks or
+               self._sender_q.unfinished_tasks) and not self._exception:
+            # No error recorded, and we have tasks in the queue
+            LOG.info("Waiting for unfinished transfers to complete")
+            time.sleep(0.5)
+
+    def close(self):
+        self._closing = True
+        self._wait_for_queues()
+        if self._exception:
+            # There was an exception while writing. We still need to
+            # release the disk.
+            try:
+                self._release()
+            except Exception as err:
+                LOG.error("Failed to release disk %s: %s. Ignoring." % (
+                    self._path, err))
+            raise exception.CoriolisException(self._exception)
+
+        self._release()
+        if self._session:
+            self._session.close()
+            self._session = None
+        if self._sender_evt:
+            eventlet.kill(self._sender_evt)
+            self._sender_evt = None
+        if self._compressor_evt:
+            for i in self._compressor_evt:
+                eventlet.kill(i)
+            self._compressor_evt = None
+
+
+class HTTPBackupWriter(BaseBackupWriter):
+
+    def __init__(self, ip, port, username, pkey,
+                 password, writer_port, volumes_info, cert_dir):
+        self._ip = ip
+        self._port = port
+        self._username = username
+        self._pkey = pkey
+        self._password = password
+        self._volumes_info = volumes_info
+        self._writer_port = writer_port
+        self._lock = threading.Lock()
+        self._id = str(uuid.uuid4())
+        self._writer_cmd = os.path.join(
+            "/usr/bin", _CORIOLIS_HTTP_WRITER_CMD)
+        self._crt = None
+        self._key = None
+        self._ca = None
+        if os.path.isdir(cert_dir) is False:
+            raise exception.CoriolisException(
+                "Certificates dir %s does not exist" % cert_dir
+            )
+        self._crt_dir = cert_dir
+
+    def _wait_for_conn(self):
+        LOG.debug(
+            "waiting for coriolis-writer connectivity %s:%s" % (
+                self._ip, self._writer_port))
+        utils.wait_for_port_connectivity(
+            self._ip, self._writer_port)
+
+    def _inject_iptables_allow(self, ssh):
+        utils.exec_ssh_cmd(
+            ssh,
+            "sudo /sbin/iptables -I INPUT -p tcp --dport %s "
+            "-j ACCEPT" % self._writer_port)
+
+    def _get_impl(self, path, disk_id):
+        ssh = self._connect_ssh()
+        self._setup_writer(ssh)
+
+        path = [v for v in self._volumes_info
+                if v["disk_id"] == disk_id][0]["volume_dev"]
+        impl = HTTPBackupWriterImpl(path, disk_id)
+        impl._set_info({
+            "ip": self._ip,
+            "port": self._writer_port,
+            "client_crt": self._crt,
+            "client_key": self._key,
+            "ca_crt": self._ca,
+            "id": self._id,
+        })
+        return impl
+
+    @utils.retry_on_error()
+    def _copy_writer(self, ssh):
+        local_path = os.path.join(
+            utils.get_resources_dir(), _CORIOLIS_HTTP_WRITER_CMD)
+        remote_tmp_path = os.path.join("/tmp", _CORIOLIS_HTTP_WRITER_CMD)
+        with self._lock:
+            sftp = ssh.open_sftp()
+            try:
+                # Check if the remote file already exists
+                sftp.stat(self._writer_cmd)
+            except IOError as ex:
+                if ex.errno != errno.ENOENT:
+                    raise
+                sftp.put(local_path, remote_tmp_path)
+                utils.exec_ssh_cmd(
+                    ssh,
+                    "sudo mv %s %s" % (
+                        remote_tmp_path, self._writer_cmd),
+                    get_pty=True
+                )
+                utils.exec_ssh_cmd(
+                    ssh,
+                    "sudo chmod +x %s" % self._writer_cmd,
+                    get_pty=True
+                )
+            finally:
+                sftp.close()
+
+    def _fetch_remote_file(self, ssh, remote_file, local_file):
+        with open(local_file, 'wb') as fd:
+            utils.exec_ssh_cmd(
+                ssh,
+                "sudo chmod +r %s" % remote_file, get_pty=True)
+            data = utils.retry_on_error()(
+                utils.read_ssh_file)(ssh, remote_file)
+            fd.write(data)
+
+    def _setup_certificates(self, ssh):
+        remote_base_dir = "/etc/coriolis-writer"
+
+        ca_crt_name = "ca-cert.pem"
+        client_crt_name = "client-cert.pem"
+        client_key_name = "client-key.pem"
+
+        srv_crt_name = "srv-cert.pem"
+        srv_key_name = "srv-key.pem"
+
+        remote_ca_crt = os.path.join(remote_base_dir, ca_crt_name)
+        remote_client_crt = os.path.join(remote_base_dir, client_crt_name)
+        remote_client_key = os.path.join(remote_base_dir, client_key_name)
+        remote_srv_crt = os.path.join(remote_base_dir, srv_crt_name)
+        remote_srv_key = os.path.join(remote_base_dir, srv_key_name)
+
+        ca_crt = os.path.join(self._crt_dir, ca_crt_name)
+        client_crt = os.path.join(self._crt_dir, client_crt_name)
+        client_key = os.path.join(self._crt_dir, client_key_name)
+
+        exist = []
+        for i in (remote_ca_crt, remote_client_crt, remote_client_key,
+                  remote_srv_crt, remote_srv_key):
+            exist.append(utils.test_ssh_path(ssh, i))
+
+        force_fetch = False
+        if not all(exist):
+            utils.exec_ssh_cmd(
+                ssh, "sudo mkdir -p %s" % remote_base_dir, get_pty=True)
+            utils.exec_ssh_cmd(
+                ssh,
+                "sudo %(writer_cmd)s generate-certificates -output-dir "
+                "%(cert_dir)s -certificate-hosts %(extra_hosts)s" % {
+                    "writer_cmd": self._writer_cmd,
+                    "cert_dir": remote_base_dir,
+                    "extra_hosts": self._ip,
+                },
+                get_pty=True)
+            force_fetch = True
+
+        exists = []
+        for i in (ca_crt, client_crt, client_key):
+            exists.append(os.path.isfile(i))
+
+        if not all(exists) or force_fetch:
+            # certificates either are missing, or have been regenerated
+            # on the writer worker. We need to fetch them.
+            self._fetch_remote_file(ssh, remote_ca_crt, ca_crt)
+            self._fetch_remote_file(ssh, remote_client_crt, client_crt)
+            self._fetch_remote_file(ssh, remote_client_key, client_key)
+
+        return {
+            "local": {
+                "client_crt": client_crt,
+                "client_key": client_key,
+                "ca_crt": ca_crt,
+            },
+            "remote": {
+                "srv_crt": remote_srv_crt,
+                "srv_key": remote_srv_key,
+                "ca_crt": remote_ca_crt,
+            },
+        }
+
+    def _init_writer(self, ssh, cert_paths):
+        cmdline = ("%(cmd)s run -ca-cert %(ca_cert)s -key "
+                   "%(srv_key)s -cert %(srv_cert)s -listen-port "
+                   "%(listen_port)s") % {
+                       "cmd": self._writer_cmd,
+                       "ca_cert": cert_paths["ca_crt"],
+                       "srv_key": cert_paths["srv_key"],
+                       "srv_cert": cert_paths["srv_crt"],
+                       "listen_port": self._writer_port,
+            }
+        utils.create_service(
+            ssh, cmdline, _CORIOLIS_HTTP_WRITER_CMD, start=True)
+        self._inject_iptables_allow(ssh)
+        self._wait_for_conn()
+
+    def _setup_writer(self, ssh):
+        self._copy_writer(ssh)
+        paths = utils.retry_on_error()(
+            self._setup_certificates)(ssh)
+        self._crt = paths["local"]["client_crt"]
+        self._key = paths["local"]["client_key"]
+        self._ca = paths["local"]["ca_crt"]
+        utils.retry_on_error()(
+            self._init_writer)(ssh, paths["remote"])
+
+    @utils.retry_on_error()
+    def _connect_ssh(self):
+        LOG.info("Connecting to SSH host: %(ip)s:%(port)s" %
+                 {"ip": self._ip, "port": self._port})
+        ssh = paramiko.SSHClient()
+        ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
+        ssh.connect(
+            hostname=self._ip,
+            port=self._port,
+            username=self._username,
+            pkey=self._pkey,
+            password=self._password)
+        return ssh

BIN
coriolis/resources/coriolis-writer