Просмотр исходного кода

Merge pull request #47 from gabriel-samfira/add-http-bkupwriter

Add http backup writer
Nashwan Azhari 6 лет назад
Родитель
Сommit
462a50fe38
2 измененных файлов с 419 добавлено и 0 удалено
  1. 419 0
      coriolis/providers/backup_writers.py
  2. BIN
      coriolis/resources/coriolis-writer

+ 419 - 0
coriolis/providers/backup_writers.py

@@ -5,13 +5,19 @@ import abc
 import contextlib
 import errno
 import os
+import tempfile
 import threading
+import time
+import uuid
 
+import eventlet
 from oslo_config import cfg
 from oslo_log import log as logging
 import paramiko
+import requests
 from six import with_metaclass
 
+from coriolis import constants
 from coriolis import data_transfer
 from coriolis import exception
 from coriolis import utils
@@ -24,6 +30,7 @@ opts = [
                 help='Use compression if possible during disk transfers'),
 ]
 CONF.register_opts(opts)
+_CORIOLIS_HTTP_WRITER_CMD = "coriolis-writer"
 
 LOG = logging.getLogger(__name__)
 
@@ -270,3 +277,415 @@ class SSHBackupWriter(BaseBackupWriter):
             ssh.close()
             raise
         return ssh
+
+
+class HTTPBackupWriterImpl(BaseBackupWriterImpl):
+    def __init__(self, path, disk_id,
+                 compress_transfer=None, compressor_count=3):
+        self._offset = None
+        self._session = None
+        self._ip = None
+        self._port = None
+        self._crt = None
+        self._key = None
+        self._ca = None
+        self._closing = False
+        self._write_error = False
+        self._id = None
+        self._exception = None
+        self._compressor_count = compressor_count
+        self._comp_q = eventlet.Queue(maxsize=5)
+        self._sender_q = eventlet.Queue(maxsize=5)
+
+        self._sender_evt = None
+        self._compressor_evt = None
+
+        self._compress_transfer = compress_transfer
+        if self._compress_transfer is None:
+            self._compress_transfer = CONF.compress_transfers
+        super(HTTPBackupWriterImpl, self).__init__(path, disk_id)
+
+    def _set_info(self, info):
+        self._ip = info.get("ip")
+        self._port = info.get("port")
+        self._crt = info.get("client_crt")
+        self._key = info.get("client_key")
+        self._ca = info.get("ca_crt")
+        self._id = info.get("id")
+        if not all([self._ip, self._port, self._crt,
+                    self._key, self._ca, self._id]):
+            raise exception.CoriolisException(
+                "Missing required info when creating HTTPBackupWriter")
+
+    @property
+    def _uri(self):
+        return "https://%s:%s/api/v1/%s" % (
+            self._ip, self._port, self._path.lstrip('/')
+        )
+
+    @utils.retry_on_error()
+    def _acquire(self):
+        self._ensure_session()
+        uri = "%s/acquire" % self._uri
+        headers = {"X-Client-Token": self._id}
+        resp = self._session.get(uri, headers=headers)
+        LOG.debug("Returned code: %d. Msg: %s" % (
+            resp.status_code, resp.content))
+        resp.raise_for_status()
+
+    @utils.retry_on_error()
+    def _release(self):
+        self._ensure_session()
+        uri = "%s/release" % self._uri
+        headers = {"X-Client-Token": self._id}
+        resp = self._session.get(uri, headers=headers)
+        LOG.debug("Returned code: %d. Msg: %s" %
+                  (resp.status_code, resp.content))
+        resp.raise_for_status()
+
+    def _init_session(self):
+        if self._session:
+            self._session.close()
+        sess = requests.Session()
+        sess.cert = (
+            self._crt,
+            self._key)
+        sess.verify = self._ca
+        self._session = sess
+
+    def _open(self):
+        self._closing = False
+        self._init_session()
+        self._acquire()
+        self._sender_evt = eventlet.spawn(self._sender)
+        if self._compressor_count is None or self._compressor_count == 0:
+            self._compressor_count = 1
+        self._compressor_evt = []
+        for i in range(self._compressor_count):
+            self._compressor_evt.append(
+                eventlet.spawn(self._compressor))
+
+    def seek(self, pos):
+        self._offset = pos
+
+    def truncate(self, size):
+        pass
+
+    def _ensure_session(self):
+        if not self._session:
+            self._init_session()
+            return
+        if self._write_error:
+            self._init_session()
+            return
+
+    def _compressor(self):
+        while True:
+            payload = self._comp_q.get()
+            send_payload = {
+                "encoding": None,
+                "offset": payload["offset"],
+            }
+            chunk = payload["data"]
+            if self._compress_transfer:
+                try:
+                    chunk, compressed = data_transfer.compression_proxy(
+                        chunk, constants.COMPRESSION_FORMAT_GZIP)
+                    if compressed:
+                        send_payload["encoding"] = 'gzip'
+                except Exception as err:
+                    LOG.exception(err)
+                    self._exception = err
+                    raise
+            send_payload["chunk"] = chunk
+            self._sender_q.put(send_payload)
+            self._comp_q.task_done()
+
+    def _sender(self):
+        while True:
+            payload = self._sender_q.get()
+            headers = {
+                "X-Write-Offset": str(payload["offset"]),
+                "X-Client-Token": self._id,
+            }
+            if payload.get("encoding", None):
+                headers["content-encoding"] = payload["encoding"]
+
+            @utils.retry_on_error()
+            def send():
+                self._ensure_session()
+                resp = self._session.post(
+                    self._uri, headers=headers, data=payload["chunk"]
+                )
+                LOG.debug(
+                    "Response code: %r, content: %r" %
+                    (resp.status_code, resp.content))
+                try:
+                    resp.raise_for_status()
+                    self._write_error = False
+                except Exception as err:
+                    LOG.warning(
+                        "Error writing chunk to disk %s at offset"
+                        " %s: %s" % (self._path, payload["offset"], err))
+                    self._write_error = True
+                    raise
+            try:
+                send()
+            except Exception as err:
+                # record the exception. We need to terminate
+                # the writer if this is set
+                LOG.exception(err)
+                self._exception = err
+                raise
+            self._sender_q.task_done()
+
+    @utils.retry_on_error()
+    def write(self, data):
+        if self._closing:
+            raise exception.CoriolisException(
+                "Attempted to write to a closed writer."
+            )
+        if self._exception:
+            raise exception.CoriolisException(self._exception)
+
+        payload = {
+            "offset": self._offset,
+            "data": data,
+        }
+        self._comp_q.put(payload)
+        self._offset += len(data)
+
+    def _wait_for_queues(self):
+        while (self._comp_q.unfinished_tasks or
+               self._sender_q.unfinished_tasks) and not self._exception:
+            # No error recorded, and we have tasks in the queue
+            LOG.info("Waiting for unfinished transfers to complete")
+            time.sleep(0.5)
+
+    def close(self):
+        self._closing = True
+        self._wait_for_queues()
+        if self._exception:
+            # There was an exception while writing. We still need to
+            # release the disk.
+            try:
+                self._release()
+            except Exception as err:
+                LOG.error("Failed to release disk %s: %s. Ignoring." % (
+                    self._path, err))
+            raise exception.CoriolisException(self._exception)
+
+        self._release()
+        if self._session:
+            self._session.close()
+            self._session = None
+        if self._sender_evt:
+            eventlet.kill(self._sender_evt)
+            self._sender_evt = None
+        if self._compressor_evt:
+            for i in self._compressor_evt:
+                eventlet.kill(i)
+            self._compressor_evt = None
+
+
+class HTTPBackupWriter(BaseBackupWriter):
+
+    def __init__(self, ip, port, username, pkey,
+                 password, writer_port, volumes_info,
+                 cert_dir, compressor_count=3):
+        self._ip = ip
+        self._port = port
+        self._username = username
+        self._pkey = pkey
+        self._password = password
+        self._volumes_info = volumes_info
+        self._writer_port = writer_port
+        self._lock = threading.Lock()
+        self._id = str(uuid.uuid4())
+        self._compressor_count = compressor_count
+        self._writer_cmd = os.path.join(
+            "/usr/bin", _CORIOLIS_HTTP_WRITER_CMD)
+        self._crt = None
+        self._key = None
+        self._ca = None
+        if os.path.isdir(cert_dir) is False:
+            raise exception.CoriolisException(
+                "Certificates dir %s does not exist" % cert_dir
+            )
+        self._crt_dir = cert_dir
+
+    def _wait_for_conn(self):
+        LOG.debug(
+            "waiting for coriolis-writer connectivity %s:%s" % (
+                self._ip, self._writer_port))
+        utils.wait_for_port_connectivity(
+            self._ip, self._writer_port)
+
+    def _inject_iptables_allow(self, ssh):
+        utils.exec_ssh_cmd(
+            ssh,
+            "sudo /sbin/iptables -I INPUT -p tcp --dport %s "
+            "-j ACCEPT" % self._writer_port)
+
+    def _get_impl(self, path, disk_id):
+        ssh = self._connect_ssh()
+        self._setup_writer(ssh)
+
+        path = [v for v in self._volumes_info
+                if v["disk_id"] == disk_id][0]["volume_dev"]
+        impl = HTTPBackupWriterImpl(
+            path, disk_id, self._compressor_count)
+        impl._set_info({
+            "ip": self._ip,
+            "port": self._writer_port,
+            "client_crt": self._crt,
+            "client_key": self._key,
+            "ca_crt": self._ca,
+            "id": self._id,
+        })
+        return impl
+
+    @utils.retry_on_error()
+    def _copy_writer(self, ssh):
+        local_path = os.path.join(
+            utils.get_resources_dir(), _CORIOLIS_HTTP_WRITER_CMD)
+        remote_tmp_path = os.path.join("/tmp", _CORIOLIS_HTTP_WRITER_CMD)
+        with self._lock:
+            sftp = ssh.open_sftp()
+            try:
+                # Check if the remote file already exists
+                sftp.stat(self._writer_cmd)
+            except IOError as ex:
+                if ex.errno != errno.ENOENT:
+                    raise
+                sftp.put(local_path, remote_tmp_path)
+                utils.exec_ssh_cmd(
+                    ssh,
+                    "sudo mv %s %s" % (
+                        remote_tmp_path, self._writer_cmd),
+                    get_pty=True
+                )
+                utils.exec_ssh_cmd(
+                    ssh,
+                    "sudo chmod +x %s" % self._writer_cmd,
+                    get_pty=True
+                )
+            finally:
+                sftp.close()
+
+    def _fetch_remote_file(self, ssh, remote_file, local_file):
+        with open(local_file, 'wb') as fd:
+            utils.exec_ssh_cmd(
+                ssh,
+                "sudo chmod +r %s" % remote_file, get_pty=True)
+            data = utils.retry_on_error()(
+                utils.read_ssh_file)(ssh, remote_file)
+            fd.write(data)
+
+    def _setup_certificates(self, ssh):
+        remote_base_dir = "/etc/coriolis-writer"
+
+        ca_crt_name = "ca-cert.pem"
+        client_crt_name = "client-cert.pem"
+        client_key_name = "client-key.pem"
+
+        srv_crt_name = "srv-cert.pem"
+        srv_key_name = "srv-key.pem"
+
+        remote_ca_crt = os.path.join(remote_base_dir, ca_crt_name)
+        remote_client_crt = os.path.join(remote_base_dir, client_crt_name)
+        remote_client_key = os.path.join(remote_base_dir, client_key_name)
+        remote_srv_crt = os.path.join(remote_base_dir, srv_crt_name)
+        remote_srv_key = os.path.join(remote_base_dir, srv_key_name)
+
+        ca_crt = os.path.join(self._crt_dir, ca_crt_name)
+        client_crt = os.path.join(self._crt_dir, client_crt_name)
+        client_key = os.path.join(self._crt_dir, client_key_name)
+
+        exist = []
+        for i in (remote_ca_crt, remote_client_crt, remote_client_key,
+                  remote_srv_crt, remote_srv_key):
+            exist.append(utils.test_ssh_path(ssh, i))
+
+        force_fetch = False
+        if not all(exist):
+            utils.exec_ssh_cmd(
+                ssh, "sudo mkdir -p %s" % remote_base_dir, get_pty=True)
+            utils.exec_ssh_cmd(
+                ssh,
+                "sudo %(writer_cmd)s generate-certificates -output-dir "
+                "%(cert_dir)s -certificate-hosts %(extra_hosts)s" % {
+                    "writer_cmd": self._writer_cmd,
+                    "cert_dir": remote_base_dir,
+                    "extra_hosts": self._ip,
+                },
+                get_pty=True)
+            force_fetch = True
+
+        exists = []
+        for i in (ca_crt, client_crt, client_key):
+            exists.append(os.path.isfile(i))
+
+        if not all(exists) or force_fetch:
+            # certificates either are missing, or have been regenerated
+            # on the writer worker. We need to fetch them.
+            self._fetch_remote_file(ssh, remote_ca_crt, ca_crt)
+            self._fetch_remote_file(ssh, remote_client_crt, client_crt)
+            self._fetch_remote_file(ssh, remote_client_key, client_key)
+
+        return {
+            "local": {
+                "client_crt": client_crt,
+                "client_key": client_key,
+                "ca_crt": ca_crt,
+            },
+            "remote": {
+                "srv_crt": remote_srv_crt,
+                "srv_key": remote_srv_key,
+                "ca_crt": remote_ca_crt,
+            },
+        }
+
+    def _init_writer(self, ssh, cert_paths):
+        cmdline = ("%(cmd)s run -ca-cert %(ca_cert)s -key "
+                   "%(srv_key)s -cert %(srv_cert)s -listen-port "
+                   "%(listen_port)s") % {
+                       "cmd": self._writer_cmd,
+                       "ca_cert": cert_paths["ca_crt"],
+                       "srv_key": cert_paths["srv_key"],
+                       "srv_cert": cert_paths["srv_crt"],
+                       "listen_port": self._writer_port,
+            }
+        utils.create_service(
+            ssh, cmdline, _CORIOLIS_HTTP_WRITER_CMD, start=True)
+        self._inject_iptables_allow(ssh)
+        self._wait_for_conn()
+
+    def _setup_writer(self, ssh):
+        self._copy_writer(ssh)
+        paths = utils.retry_on_error()(
+            self._setup_certificates)(ssh)
+        self._crt = paths["local"]["client_crt"]
+        self._key = paths["local"]["client_key"]
+        self._ca = paths["local"]["ca_crt"]
+        utils.retry_on_error()(
+            self._init_writer)(ssh, paths["remote"])
+
+    @utils.retry_on_error(sleep_seconds=30)
+    def _connect_ssh(self):
+        LOG.info("Connecting to SSH host: %(ip)s:%(port)s" %
+                 {"ip": self._ip, "port": self._port})
+        ssh = paramiko.SSHClient()
+        ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
+        try:
+            ssh.connect(
+                hostname=self._ip,
+                port=self._port,
+                username=self._username,
+                pkey=self._pkey,
+                password=self._password)
+        except:
+            # No need to log the error as we just raise
+            ssh.close()
+            raise
+        return ssh

BIN
coriolis/resources/coriolis-writer