|
|
@@ -1,4 +1,5 @@
|
|
|
import tempfile
|
|
|
+import shutil
|
|
|
import zipfile
|
|
|
import json
|
|
|
import paramiko
|
|
|
@@ -35,6 +36,38 @@ replicator_opts = [
|
|
|
CONF = cfg.CONF
|
|
|
CONF.register_opts(replicator_opts, 'replicator')
|
|
|
|
|
|
+SYSTEMD_TEMPLATE = """
|
|
|
+[Unit]
|
|
|
+Description=Coriolis replicator
|
|
|
+After=network-online.target
|
|
|
+
|
|
|
+[Service]
|
|
|
+Type=simple
|
|
|
+ExecStart=%(cmdline)s
|
|
|
+Restart=always
|
|
|
+RestartSec=5s
|
|
|
+User=%(username)s
|
|
|
+
|
|
|
+[Install]
|
|
|
+WantedBy=multi-user.target
|
|
|
+"""
|
|
|
+
|
|
|
+UPSTART_TEMPLATE = """
|
|
|
+# replicator - Coriolis replicator service
|
|
|
+#
|
|
|
+# The replicator provides access to raw disks
|
|
|
+
|
|
|
+description "Replicator service"
|
|
|
+
|
|
|
+start on runlevel [2345]
|
|
|
+stop on runlevel [!2345]
|
|
|
+
|
|
|
+respawn
|
|
|
+umask 022
|
|
|
+
|
|
|
+exec %(cmdline)s
|
|
|
+"""
|
|
|
+
|
|
|
|
|
|
class Client(object):
|
|
|
|
|
|
@@ -230,11 +263,15 @@ class Replicator(object):
|
|
|
self._ssh = self._setup_ssh()
|
|
|
self._credentials = None
|
|
|
self._cli = None
|
|
|
- self._stdout = None
|
|
|
- self._stdin = None
|
|
|
- self._stderr = None
|
|
|
self._use_tunnel = use_tunnel
|
|
|
|
|
|
+ def __del__(self):
|
|
|
+ if self._cert_dir is not None:
|
|
|
+ try:
|
|
|
+ shutil.rmtree(self._cert_dir)
|
|
|
+ except:
|
|
|
+ pass
|
|
|
+
|
|
|
def _init_replicator_client(self, credentials):
|
|
|
"""
|
|
|
Helper function to create an instance of the replicator
|
|
|
@@ -259,8 +296,7 @@ class Replicator(object):
|
|
|
|
|
|
def init_replicator(self):
|
|
|
utils.retry_on_error()(self._setup_replicator)(self._ssh)
|
|
|
- self._credentials = utils.retry_on_error()(
|
|
|
- self._fetch_certificates)()
|
|
|
+ self._credentials = self._fetch_certificates()
|
|
|
utils.retry_on_error()(
|
|
|
self._init_replicator_client)(self._credentials)
|
|
|
|
|
|
@@ -389,15 +425,65 @@ class Replicator(object):
|
|
|
ssh, "sudo usermod -aG disk %s" % REPLICATOR_USERNAME,
|
|
|
get_pty=True)
|
|
|
|
|
|
- def _check_replicator_errors(self):
|
|
|
- if self._stdout.channel.exit_status_ready():
|
|
|
- exit_code = self._stdout.channel.recv_exit_status()
|
|
|
- if exit_code:
|
|
|
- stderr = self._stderr.read()
|
|
|
- stdout = self._stdout.read()
|
|
|
- raise exception.CoriolisException(
|
|
|
- "failed to start replicator: stdout: "
|
|
|
- "%s; stderr: %s" % (stdout, stderr))
|
|
|
+ def _write_systemd(self, ssh, cmdline):
|
|
|
+ serviceFilePath = "/lib/systemd/system/replicator.service"
|
|
|
+ def _reload_and_start():
|
|
|
+ utils.exec_ssh_cmd(
|
|
|
+ ssh, "sudo systemctl daemon-reload",
|
|
|
+ get_pty=True)
|
|
|
+ utils.exec_ssh_cmd(
|
|
|
+ ssh, "sudo systemctl start replicator",
|
|
|
+ get_pty=True)
|
|
|
+
|
|
|
+ systemdService = SYSTEMD_TEMPLATE % {
|
|
|
+ "cmdline": cmdline,
|
|
|
+ "username": REPLICATOR_USERNAME,
|
|
|
+ }
|
|
|
+ utils.write_ssh_file(
|
|
|
+ ssh, '/tmp/replicator.service', systemdService)
|
|
|
+ utils.exec_ssh_cmd(
|
|
|
+ ssh,
|
|
|
+ "sudo mv /tmp/replicator.service %s" % serviceFilePath,
|
|
|
+ get_pty=True).decode().rstrip("\n")
|
|
|
+ _reload_and_start()
|
|
|
+
|
|
|
+ def _write_upstart(self, ssh, cmdline):
|
|
|
+ serviceFilePath = "/etc/init/replicator.conf"
|
|
|
+
|
|
|
+ upstartService = UPSTART_TEMPLATE % {
|
|
|
+ "cmdline": cmdline,
|
|
|
+ }
|
|
|
+ utils.write_ssh_file(
|
|
|
+ ssh, '/tmp/replicator.conf', upstartService)
|
|
|
+ utils.exec_ssh_cmd(
|
|
|
+ ssh,
|
|
|
+ "sudo mv /tmp/replicator.conf %s" % serviceFilePath,
|
|
|
+ get_pty=True).decode().rstrip(
|
|
|
+ "\n")
|
|
|
+ utils.exec_ssh_cmd(ssh, "start replicator")
|
|
|
+
|
|
|
+ @utils.retry_on_error()
|
|
|
+ def _folder_exists(self, ssh, folder):
|
|
|
+ LOG.debug("Checking if %s exists" % folder)
|
|
|
+ exists = utils.exec_ssh_cmd(
|
|
|
+ ssh, '[ -d "%s" ] && echo 1 || echo 0' % folder)
|
|
|
+ if exists.decode().rstrip("\n") == "1":
|
|
|
+ return True
|
|
|
+ return False
|
|
|
+
|
|
|
+ @utils.retry_on_error()
|
|
|
+ def _write_system_startup(self, ssh, cmdline):
|
|
|
+ # Simplistic check for init system. We usually use official images, and none
|
|
|
+ # of the supported operating systems come with both upstart and systemd
|
|
|
+ # installed side by side. So if /lib/systemd/system exists, it's usually
|
|
|
+ # systemd enabled. If not, but /etc/init exists, it's upstart
|
|
|
+ if self._folder_exists(ssh, "/lib/systemd/system"):
|
|
|
+ self._write_systemd(ssh, cmdline)
|
|
|
+ elif self._folder_exists(ssh, "/etc/init"):
|
|
|
+ self._write_upstart(ssh, cmdline)
|
|
|
+ else:
|
|
|
+ raise exception.CoriolisException(
|
|
|
+ "could not determine init system")
|
|
|
|
|
|
@utils.retry_on_error()
|
|
|
def _exec_replicator(self, ssh, args, state_file):
|
|
|
@@ -429,21 +515,7 @@ class Replicator(object):
|
|
|
"state_file": state_file,
|
|
|
"chunk_size": self._chunk_size,
|
|
|
})
|
|
|
- self._stdin, self._stdout, self._stderr = ssh.exec_command(
|
|
|
- "sudo -u %(username)s -- %(cmdline)s > /tmp/replicator.log" % {
|
|
|
- "cmdline": cmdline,
|
|
|
- "username": REPLICATOR_USERNAME,
|
|
|
- }, get_pty=True)
|
|
|
- count = 0
|
|
|
- # wait 5 seconds. If process exits, raise
|
|
|
- # TODO(gsamfira): create system service? That should take care of
|
|
|
- # restarting the replicator process if it fails.
|
|
|
- while True:
|
|
|
- if count >= 5:
|
|
|
- break
|
|
|
- self._check_replicator_errors()
|
|
|
- time.sleep(1)
|
|
|
- count += 1
|
|
|
+ self._write_system_startup(ssh, cmdline)
|
|
|
|
|
|
@utils.retry_on_error()
|
|
|
def _setup_replicator(self, ssh):
|
|
|
@@ -459,6 +531,7 @@ class Replicator(object):
|
|
|
self._setup_replicator_user(ssh)
|
|
|
self._exec_replicator(ssh, args, REPLICATOR_STATE)
|
|
|
|
|
|
+ @utils.retry_on_error(sleep_seconds=5)
|
|
|
def _fetch_certificates(self):
|
|
|
"""
|
|
|
Fetch the client certificates
|
|
|
@@ -580,28 +653,11 @@ class Replicator(object):
|
|
|
len(chunks), message_format=msg)
|
|
|
|
|
|
total = 0
|
|
|
-
|
|
|
- @utils.retry_on_error()
|
|
|
- def _download_or_reconnect(chunk):
|
|
|
- try:
|
|
|
- data = self._cli.download_chunk(
|
|
|
- devName, chunk)
|
|
|
- return data
|
|
|
- except BaseException as err:
|
|
|
- LOG.error("Error downloading chunk: %r" % err)
|
|
|
- try:
|
|
|
- self._check_replicator_errors()
|
|
|
- except BaseException as err:
|
|
|
- LOG.error("replicator is in error: %r" % err)
|
|
|
- self.init_replicator()
|
|
|
- raise
|
|
|
- raise
|
|
|
-
|
|
|
with backup_writer.open("", volume['disk_id']) as destination:
|
|
|
for chunk in chunks:
|
|
|
offset = int(chunk["offset"])
|
|
|
destination.seek(offset)
|
|
|
- data = _download_or_reconnect(chunk)
|
|
|
+ data = self._cli.download_chunk(devName, chunk)
|
|
|
destination.write(data)
|
|
|
total += 1
|
|
|
self._event_manager.set_percentage_step(
|