Просмотр исходного кода

Merge pull request #48 from gabriel-samfira/cleanup-replicator

Cleanup replicator integration
Nashwan Azhari 6 лет назад
Родитель
Сommit
b3ce69bd32
2 измененных файлов с 125 добавлено и 183 удалено
  1. 125 183
      coriolis/providers/replicator.py
  2. BIN
      coriolis/resources/replicator

+ 125 - 183
coriolis/providers/replicator.py

@@ -1,30 +1,33 @@
-import tempfile
-import shutil
-import zipfile
-import json
-import paramiko
 import errno
+import json
 import os
-import time
+import paramiko
 import requests
+import shutil
+import tempfile
+import time
+import uuid
 
-from coriolis import utils
-from coriolis import exception
 from oslo_config import cfg
-
 from oslo_log import log as logging
 from oslo_utils import units
-
 from sshtunnel import SSHTunnelForwarder
 
+from coriolis import exception
+from coriolis import utils
+
+
 LOG = logging.getLogger(__name__)
 
 HASH_METHOD_SHA256 = "sha256"
 HASH_METHOD_XXHASH = "xxhash"
 
 REPLICATOR_PATH = "/usr/bin/replicator"
+REPLICATOR_DIR = "/etc/coriolis-replicator"
 REPLICATOR_STATE = "/tmp/replicator_state.json"
 REPLICATOR_USERNAME = "replicator"
+REPLICATOR_SVC_NAME = "coriolis-replicator"
+
 DEFAULT_REPLICATOR_PORT = 4433
 
 replicator_opts = [
@@ -36,38 +39,6 @@ replicator_opts = [
 CONF = cfg.CONF
 CONF.register_opts(replicator_opts, 'replicator')
 
-SYSTEMD_TEMPLATE = """
-[Unit]
-Description=Coriolis replicator
-After=network-online.target
-
-[Service]
-Type=simple
-ExecStart=%(cmdline)s
-Restart=always
-RestartSec=5s
-User=%(username)s
-
-[Install]
-WantedBy=multi-user.target
-"""
-
-UPSTART_TEMPLATE = """
-# replicator - Coriolis replicator service
-#
-# The replicator provides access to raw disks
-
-description     "Replicator service"
-
-start on runlevel [2345]
-stop on runlevel [!2345]
-
-respawn
-umask 022
-
-exec %(cmdline)s
-"""
-
 
 class Client(object):
 
@@ -253,7 +224,7 @@ class Replicator(object):
         self._repl_state = replica_state
         self._conn_info = conn_info
         self._config_dir = None
-        self._cert_dir = None
+        self._cert_dir = tempfile.mkdtemp()
         self._volumes_info = volumes_info
 
         self._use_compression = use_compression
@@ -299,8 +270,8 @@ class Replicator(object):
         return ssh
 
     def init_replicator(self):
-        utils.retry_on_error()(self._setup_replicator)(self._ssh)
-        self._credentials = self._fetch_certificates()
+        self._credentials = utils.retry_on_error()(
+            self._setup_replicator)(self._ssh)
         utils.retry_on_error()(
             self._init_replicator_client)(self._credentials)
 
@@ -390,12 +361,9 @@ class Replicator(object):
             "port": CONF.replicator.port,
         }
 
+    @utils.retry_on_error()
     def _copy_file(self, ssh, localPath, remotePath):
-        tmp = tempfile.mkstemp()[1]
-        try:
-            os.remove(tmp)
-        except BaseException:
-            pass
+        tmp = os.path.join("/tmp", str(uuid.uuid4()))
 
         sftp = paramiko.SFTPClient.from_transport(ssh.get_transport())
         try:
@@ -418,9 +386,13 @@ class Replicator(object):
         utils.exec_ssh_cmd(
             ssh, "sudo chmod +x %s" % REPLICATOR_PATH, get_pty=True)
 
+    @utils.retry_on_error()
     def _setup_replicator_user(self, ssh):
         user_exists = utils.exec_ssh_cmd(
-            ssh, "getent passwd replicator > /dev/null && echo 1 || echo 0")
+            ssh,
+            "getent passwd %(user)s > /dev/null && echo 1 || echo 0" % {
+                "user": REPLICATOR_USERNAME
+            })
         if int(user_exists) == 0:
             utils.exec_ssh_cmd(
                 ssh, "sudo useradd -m -s /bin/bash %s" % REPLICATOR_USERNAME,
@@ -429,163 +401,133 @@ class Replicator(object):
                 ssh, "sudo usermod -aG disk %s" % REPLICATOR_USERNAME,
                 get_pty=True)
 
-    def _write_systemd(self, ssh, cmdline):
-        serviceFilePath = "/lib/systemd/system/replicator.service"
-        def _reload_and_start():
-            utils.exec_ssh_cmd(
-                ssh, "sudo systemctl daemon-reload",
-                get_pty=True)
-            utils.exec_ssh_cmd(
-                ssh, "sudo systemctl start replicator",
-                get_pty=True)
-
-        systemdService = SYSTEMD_TEMPLATE % {
-            "cmdline": cmdline,
-            "username": REPLICATOR_USERNAME,
-        }
-        utils.write_ssh_file(
-            ssh, '/tmp/replicator.service', systemdService)
-        utils.exec_ssh_cmd(
-            ssh,
-            "sudo mv /tmp/replicator.service %s" % serviceFilePath,
-            get_pty=True).decode().rstrip("\n")
-        _reload_and_start()
-
-    def _write_upstart(self, ssh, cmdline):
-        serviceFilePath = "/etc/init/replicator.conf"
-
-        upstartService = UPSTART_TEMPLATE % {
-            "cmdline": cmdline,
-        }
-        utils.write_ssh_file(
-            ssh, '/tmp/replicator.conf', upstartService)
-        utils.exec_ssh_cmd(
-            ssh,
-            "sudo mv /tmp/replicator.conf %s" % serviceFilePath,
-            get_pty=True).decode().rstrip(
-                "\n")
-        utils.exec_ssh_cmd(ssh, "start replicator")
-
-    @utils.retry_on_error()
-    def _folder_exists(self, ssh, folder):
-        LOG.debug("Checking if %s exists" % folder)
-        exists = utils.exec_ssh_cmd(
-            ssh, '[ -d "%s" ] && echo 1 || echo 0' % folder)
-        if exists.decode().rstrip("\n") == "1":
-            return True
-        return False
-
     @utils.retry_on_error()
-    def _write_system_startup(self, ssh, cmdline):
-        # Simplistic check for init system. We usually use official images, and none
-        # of the supported operating systems come with both upstart and systemd
-        # installed side by side. So if /lib/systemd/system exists, it's usually
-        # systemd enabled. If not, but /etc/init exists, it's upstart
-        if self._folder_exists(ssh, "/lib/systemd/system"):
-            self._write_systemd(ssh, cmdline)
-        elif self._folder_exists(ssh, "/etc/init"):
-            self._write_upstart(ssh, cmdline)
-        else:
-            raise exception.CoriolisException(
-                "could not determine init system")
-
-    @utils.retry_on_error()
-    def _exec_replicator(self, ssh, args, state_file):
-        ip = args["ip"]
-        # add 127.0.0.1 to the mix. If we need to tunnel, we will need it
-        cert_hosts = ",".join([ip, "127.0.0.1"])
-        port = args["port"]
-        self._config_dir = utils.exec_ssh_cmd(
-            ssh, "mktemp -d").decode().rstrip("\n")
-        utils.exec_ssh_cmd(
-            ssh,
-            "sudo chown %(user)s:%(user)s %(config_dir)s" % {
-                "config_dir": self._config_dir,
-                "user": REPLICATOR_USERNAME,
-            }, get_pty=True)
-        cmdline = ("/usr/bin/replicator -certificate-hosts=%(cert_hosts)s "
-                   "-config-dir=%(cfgdir)s -hash-method=%(hash_method)s "
+    def _exec_replicator(self, ssh, port, certs, state_file):
+        cmdline = ("%(replicator_path)s run -hash-method=%(hash_method)s "
                    "-ignore-mounted-disks=%(ignore_mounted)s "
                    "-listen-port=%(listen_port)s "
                    "-chunk-size=%(chunk_size)s "
                    "-watch-devices=%(watch_devs)s "
-                   "-state-file=%(state_file)s" % {
-                       "cfgdir": self._config_dir,
-                       "cert_hosts": cert_hosts,
+                   "-state-file=%(state_file)s "
+                   "-ca-cert=%(ca_cert)s -cert=%(srv_cert)s "
+                   "-key=%(srv_key)s" % {
+                       "replicator_path": REPLICATOR_PATH,
                        "hash_method": self._hash_method,
                        "ignore_mounted": json.dumps(self._ignore_mounted),
                        "watch_devs": json.dumps(self._watch_devices),
                        "listen_port": str(port),
                        "state_file": state_file,
                        "chunk_size": self._chunk_size,
+                       "ca_cert": certs["ca_crt"],
+                       "srv_cert": certs["srv_crt"],
+                       "srv_key": certs["srv_key"],
                    })
-        self._write_system_startup(ssh, cmdline)
+        utils.create_service(
+            ssh, cmdline, REPLICATOR_SVC_NAME,
+            run_as=REPLICATOR_USERNAME)
 
-    @utils.retry_on_error()
-    def _setup_replicator(self, ssh):
-        # copy the binary and execute it.
-        state_file = self._get_replicator_state_file()
-        self._copy_file(ssh, state_file, REPLICATOR_STATE)
-        utils.exec_ssh_cmd(
-            ssh, "sudo chmod 755 %s" % REPLICATOR_STATE, get_pty=True)
-        os.remove(state_file)
-
-        args = self._parse_replicator_conn_info(self._conn_info)
-        self._copy_replicator_cmd(ssh)
-        self._setup_replicator_user(ssh)
-        self._exec_replicator(ssh, args, REPLICATOR_STATE)
+    def _fetch_remote_file(self, ssh, remote_file, local_file):
+        # TODO(gsamfira): make this re-usable
+        with open(local_file, 'wb') as fd:
+            data = utils.retry_on_error()(
+                utils.read_ssh_file)(ssh, remote_file)
+            fd.write(data)
 
     @utils.retry_on_error(sleep_seconds=5)
-    def _fetch_certificates(self):
-        """
-        Fetch the client certificates
-        Returns a dict with paths to the certificates
-        {
-            "client_cert": "/tmp/tmp.RAA6wsQG4s/client-cert.pem",
-            "client_key": "/tmp/tmp.RAA6wsQG4s/client-key.pem",
-            "ca_cert": "/tmp/tmp.RAA6wsQG4s/ca-cert.pem",
-        }
-        """
-        if self._cert_dir is None:
-            self._cert_dir = tempfile.mkdtemp()
+    def _setup_certificates(self, ssh, args):
+        # TODO(gsamfira): coriolis-replicator and coriolis-writer share
+        # the functionality of being able to generate certificates
+        # This will either be replaced with proper certificate management
+        # in Coriolis, and the needed files will be pushed to the services
+        # that need them (userdata or ssh), or the two applications will be
+        # merged into one, and we will deduplicate this functionallity.
+        remote_base_dir = REPLICATOR_DIR
+        ip = args["ip"]
 
-        clientCrt = os.path.join(self._cert_dir, "client-cert.pem")
-        clientKey = os.path.join(self._cert_dir, "client-key.pem")
-        caCert = os.path.join(self._cert_dir, "ca-cert.pem")
+        ca_crt_name = "ca-cert.pem"
+        client_crt_name = "client-cert.pem"
+        client_key_name = "client-key.pem"
 
-        def progressf(curr, total):
-            LOG.debug("Copied %s/%s", curr, total)
+        srv_crt_name = "srv-cert.pem"
+        srv_key_name = "srv-key.pem"
 
-        if self._config_dir is None:
-            raise exception.CoriolisException(
-                "Not initialized. Run _setup_replicator().")
+        remote_ca_crt = os.path.join(remote_base_dir, ca_crt_name)
+        remote_client_crt = os.path.join(remote_base_dir, client_crt_name)
+        remote_client_key = os.path.join(remote_base_dir, client_key_name)
+        remote_srv_crt = os.path.join(remote_base_dir, srv_crt_name)
+        remote_srv_key = os.path.join(remote_base_dir, srv_key_name)
 
-        localCertZip = os.path.join(self._cert_dir, "client-creds.zip")
-        zipFile = os.path.join(
-            self._config_dir, "ssl/client/client-creds.zip")
+        ca_crt = os.path.join(self._cert_dir, ca_crt_name)
+        client_crt = os.path.join(self._cert_dir, client_crt_name)
+        client_key = os.path.join(self._cert_dir, client_key_name)
 
-        utils.exec_ssh_cmd(
-            self._ssh, "sudo cp -f %s /tmp/creds.zip" % zipFile, get_pty=True)
-        utils.exec_ssh_cmd(
-            self._ssh, "sudo chmod +r /tmp/creds.zip", get_pty=True)
+        exist = []
+        for i in (remote_ca_crt, remote_client_crt, remote_client_key,
+                  remote_srv_crt, remote_srv_key):
+            exist.append(utils.test_ssh_path(ssh, i))
 
-        sftp = paramiko.SFTPClient.from_transport(
-            self._ssh.get_transport())
-        try:
-            sftp.get("/tmp/creds.zip", localCertZip, callback=progressf)
-        finally:
-            sftp.close()
+        force_fetch = False
+        if not all(exist):
+            utils.exec_ssh_cmd(
+                ssh, "sudo mkdir -p %s" % remote_base_dir, get_pty=True)
+            utils.exec_ssh_cmd(
+                ssh,
+                "sudo %(replicator_cmd)s gen-certs -output-dir "
+                "%(cert_dir)s -certificate-hosts %(extra_hosts)s" % {
+                    "replicator_cmd": REPLICATOR_PATH,
+                    "cert_dir": remote_base_dir,
+                    "extra_hosts": ip,
+                },
+                get_pty=True)
+            utils.exec_ssh_cmd(
+                ssh, "sudo chown -R %(user)s:%(user)s %(cert_dir)s" % {
+                        "cert_dir": remote_base_dir,
+                        "user": REPLICATOR_USERNAME
+                    }, get_pty=True)
+            force_fetch = True
+
+        exists = []
+        for i in (ca_crt, client_crt, client_key):
+            exists.append(os.path.isfile(i))
+
+        if not all(exists) or force_fetch:
+            # certificates either are missing, or have been regenerated
+            # on the writer worker. We need to fetch them.
+            self._fetch_remote_file(ssh, remote_ca_crt, ca_crt)
+            self._fetch_remote_file(ssh, remote_client_crt, client_crt)
+            self._fetch_remote_file(ssh, remote_client_key, client_key)
 
-        zFile = zipfile.ZipFile(localCertZip)
-        zFile.extractall(path=self._cert_dir)
-        zFile.close()
-        os.remove(localCertZip)
         return {
-            "client_cert": clientCrt,
-            "client_key": clientKey,
-            "ca_cert": caCert,
+            "local": {
+                "client_cert": client_crt,
+                "client_key": client_key,
+                "ca_cert": ca_crt,
+            },
+            "remote": {
+                "srv_crt": remote_srv_crt,
+                "srv_key": remote_srv_key,
+                "ca_crt": remote_ca_crt,
+            },
         }
 
+    @utils.retry_on_error()
+    def _setup_replicator(self, ssh):
+        # copy the binary, set up the service, generate certificates,
+        # start service
+        state_file = self._get_replicator_state_file()
+        self._copy_file(ssh, state_file, REPLICATOR_STATE)
+        utils.exec_ssh_cmd(
+            ssh, "sudo chmod 755 %s" % REPLICATOR_STATE, get_pty=True)
+        os.remove(state_file)
+
+        args = self._parse_replicator_conn_info(self._conn_info)
+        self._copy_replicator_cmd(ssh)
+        self._setup_replicator_user(ssh)
+        certs = self._setup_certificates(ssh, args)
+        self._exec_replicator(
+            ssh, args["port"], certs["remote"], REPLICATOR_STATE)
+        return certs["local"]
+
     def _get_size_from_chunks(self, chunks):
         ret = 0
         for chunk in chunks:

BIN
coriolis/resources/replicator