|
|
@@ -439,8 +439,6 @@ class HTTPBackupWriterImpl(BaseBackupWriterImpl):
|
|
|
self._compressor_evt = None
|
|
|
|
|
|
self._compress_transfer = compress_transfer
|
|
|
- if self._compress_transfer is None:
|
|
|
- self._compress_transfer = CONF.compress_transfers
|
|
|
super(HTTPBackupWriterImpl, self).__init__(path, disk_id)
|
|
|
|
|
|
def _set_info(self, info):
|
|
|
@@ -542,6 +540,7 @@ class HTTPBackupWriterImpl(BaseBackupWriterImpl):
|
|
|
def _sender(self):
|
|
|
while True:
|
|
|
payload = self._sender_q.get()
|
|
|
+ LOG.debug("Got send payload")
|
|
|
headers = {
|
|
|
"X-Write-Offset": str(payload["offset"]),
|
|
|
"X-Client-Token": self._id,
|
|
|
@@ -628,29 +627,26 @@ class HTTPBackupWriterImpl(BaseBackupWriterImpl):
|
|
|
|
|
|
class HTTPBackupWriter(BaseBackupWriter):
|
|
|
|
|
|
- def __init__(self, ip, port, username, pkey,
|
|
|
- password, writer_port, volumes_info,
|
|
|
- cert_dir, compressor_count=3):
|
|
|
+ def __init__(self, ip, port, volumes_info, ssh=None,
|
|
|
+ compressor_count=3, writer_creds=None):
|
|
|
self._ip = ip
|
|
|
self._port = port
|
|
|
- self._username = username
|
|
|
- self._pkey = pkey
|
|
|
- self._password = password
|
|
|
self._volumes_info = volumes_info
|
|
|
- self._writer_port = writer_port
|
|
|
+ self._writer_port = port
|
|
|
self._lock = threading.Lock()
|
|
|
self._id = str(uuid.uuid4())
|
|
|
self._compressor_count = compressor_count
|
|
|
self._writer_cmd = os.path.join(
|
|
|
"/usr/bin", _CORIOLIS_HTTP_WRITER_CMD)
|
|
|
- self._crt = None
|
|
|
- self._key = None
|
|
|
- self._ca = None
|
|
|
- if os.path.isdir(cert_dir) is False:
|
|
|
+ self._ssh = ssh
|
|
|
+ self._writer_creds = writer_creds
|
|
|
+ self._crt_dir = tempfile.mkdtemp()
|
|
|
+ if not self._ssh and not self._writer_creds:
|
|
|
raise exception.CoriolisException(
|
|
|
- "Certificates dir %s does not exist" % cert_dir
|
|
|
- )
|
|
|
- self._crt_dir = cert_dir
|
|
|
+ "Either ssh or writer_creds must be specified")
|
|
|
+ if self._ssh and self._writer_creds:
|
|
|
+ raise exception.CoriolisException(
|
|
|
+ "ssh and writer_creds are mutually exclusive")
|
|
|
|
|
|
def _wait_for_conn(self):
|
|
|
LOG.debug(
|
|
|
@@ -665,21 +661,44 @@ class HTTPBackupWriter(BaseBackupWriter):
|
|
|
"sudo /sbin/iptables -I INPUT -p tcp --dport %s "
|
|
|
"-j ACCEPT" % self._writer_port, get_pty=True)
|
|
|
|
|
|
+ def _write_cert_files(self):
|
|
|
+ if not self._writer_creds:
|
|
|
+ raise exception.CoriolisException(
|
|
|
+ "writer_creds not set")
|
|
|
+ crt_file = tempfile.mkstemp()[1]
|
|
|
+ key_file = tempfile.mkstemp()[1]
|
|
|
+ ca_crt_file = tempfile.mkstemp()[1]
|
|
|
+ with open(crt_file, "w") as fd:
|
|
|
+ fd.write(self._writer_creds["client_crt"])
|
|
|
+ with open(key_file, "w") as fd:
|
|
|
+ fd.write(self._writer_creds["client_key"])
|
|
|
+ with open(ca_crt_file, "w") as fd:
|
|
|
+ fd.write(self._writer_creds["ca_crt"])
|
|
|
+ return {
|
|
|
+ "client_crt": crt_file,
|
|
|
+ "client_key": key_file,
|
|
|
+ "ca_crt": ca_crt_file,
|
|
|
+ }
|
|
|
+
|
|
|
def _get_impl(self, path, disk_id):
|
|
|
- ssh = self._connect_ssh()
|
|
|
- _disable_lvm2_lvmetad(ssh)
|
|
|
- self._setup_writer(ssh)
|
|
|
+ if self._ssh:
|
|
|
+ _disable_lvm2_lvmetad(self._ssh)
|
|
|
+ cert_paths = self._setup_writer(self._ssh)["local"]
|
|
|
+ else:
|
|
|
+ cert_paths = self._write_cert_files()
|
|
|
|
|
|
path = [v for v in self._volumes_info
|
|
|
if v["disk_id"] == disk_id][0]["volume_dev"]
|
|
|
impl = HTTPBackupWriterImpl(
|
|
|
- path, disk_id, self._compressor_count)
|
|
|
+ path, disk_id,
|
|
|
+ compressor_count=self._compressor_count,
|
|
|
+ compress_transfer=CONF.compress_transfers)
|
|
|
impl._set_info({
|
|
|
"ip": self._ip,
|
|
|
"port": self._writer_port,
|
|
|
- "client_crt": self._crt,
|
|
|
- "client_key": self._key,
|
|
|
- "ca_crt": self._ca,
|
|
|
+ "client_crt": cert_paths["client_crt"],
|
|
|
+ "client_key": cert_paths["client_key"],
|
|
|
+ "ca_crt": cert_paths["ca_crt"],
|
|
|
"id": self._id,
|
|
|
})
|
|
|
return impl
|
|
|
@@ -804,27 +823,6 @@ class HTTPBackupWriter(BaseBackupWriter):
|
|
|
self._copy_writer(ssh)
|
|
|
paths = utils.retry_on_error()(
|
|
|
self._setup_certificates)(ssh)
|
|
|
- self._crt = paths["local"]["client_crt"]
|
|
|
- self._key = paths["local"]["client_key"]
|
|
|
- self._ca = paths["local"]["ca_crt"]
|
|
|
utils.retry_on_error()(
|
|
|
self._init_writer)(ssh, paths["remote"])
|
|
|
-
|
|
|
- @utils.retry_on_error(sleep_seconds=30)
|
|
|
- def _connect_ssh(self):
|
|
|
- LOG.info("Connecting to SSH host: %(ip)s:%(port)s" %
|
|
|
- {"ip": self._ip, "port": self._port})
|
|
|
- ssh = paramiko.SSHClient()
|
|
|
- ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
|
|
|
- try:
|
|
|
- ssh.connect(
|
|
|
- hostname=self._ip,
|
|
|
- port=self._port,
|
|
|
- username=self._username,
|
|
|
- pkey=self._pkey,
|
|
|
- password=self._password)
|
|
|
- except (Exception, KeyboardInterrupt):
|
|
|
- # No need to log the error as we just raise
|
|
|
- ssh.close()
|
|
|
- raise
|
|
|
- return ssh
|
|
|
+ return paths
|