|
|
@@ -2,7 +2,6 @@ import tempfile
|
|
|
import zipfile
|
|
|
import json
|
|
|
import paramiko
|
|
|
-import threading
|
|
|
import errno
|
|
|
import os
|
|
|
import time
|
|
|
@@ -10,6 +9,7 @@ import requests
|
|
|
|
|
|
from coriolis import utils
|
|
|
from coriolis import exception
|
|
|
+from oslo_config import cfg
|
|
|
|
|
|
from oslo_log import log as logging
|
|
|
from oslo_utils import units
|
|
|
@@ -26,6 +26,15 @@ REPLICATOR_STATE = "/tmp/replicator_state.json"
|
|
|
REPLICATOR_USERNAME = "replicator"
|
|
|
DEFAULT_REPLICATOR_PORT = 4433
|
|
|
|
|
|
+replicator_opts = [
|
|
|
+ cfg.IntOpt('port',
|
|
|
+ default=DEFAULT_REPLICATOR_PORT,
|
|
|
+ help='The replicator TCP port.'),
|
|
|
+]
|
|
|
+
|
|
|
+CONF = cfg.CONF
|
|
|
+CONF.register_opts(replicator_opts, 'replicator')
|
|
|
+
|
|
|
|
|
|
class Client(object):
|
|
|
|
|
|
@@ -202,7 +211,6 @@ class Replicator(object):
|
|
|
self._conn_info = conn_info
|
|
|
self._config_dir = None
|
|
|
self._cert_dir = None
|
|
|
- self._lock = threading.Lock()
|
|
|
self._volumes_info = volumes_info
|
|
|
self._use_compression = use_compression
|
|
|
self._watch_devices = watch_devices
|
|
|
@@ -322,18 +330,12 @@ class Replicator(object):
|
|
|
return filename
|
|
|
|
|
|
def _parse_replicator_conn_info(self, conn_info):
|
|
|
- # the replicator only really needs an IP and
|
|
|
- # a port. Authentication is done using client
|
|
|
- # certificates, which we fetch after the
|
|
|
- # application is started.
|
|
|
-
|
|
|
# The IP should be the same one as the SSH IP.
|
|
|
- # Only the port will differ
|
|
|
+ # Only the port will differ, and that is configurable.
|
|
|
ip = conn_info.get("ip", None)
|
|
|
- port = conn_info.get("replicator_port", DEFAULT_REPLICATOR_PORT)
|
|
|
return {
|
|
|
"ip": ip,
|
|
|
- "port": port,
|
|
|
+ "port": CONF.replicator.port,
|
|
|
}
|
|
|
|
|
|
def _copy_file(self, ssh, localPath, remotePath):
|
|
|
@@ -343,18 +345,17 @@ class Replicator(object):
|
|
|
except BaseException:
|
|
|
pass
|
|
|
|
|
|
- with self._lock:
|
|
|
- sftp = ssh.open_sftp()
|
|
|
- try:
|
|
|
- # Check if the remote file already exists
|
|
|
- sftp.stat(remotePath)
|
|
|
- except IOError as ex:
|
|
|
- if ex.errno != errno.ENOENT:
|
|
|
- raise
|
|
|
- sftp.put(localPath, tmp)
|
|
|
- utils.exec_ssh_cmd(ssh, "sudo mv %s %s" % (tmp, remotePath))
|
|
|
- finally:
|
|
|
- sftp.close()
|
|
|
+ sftp = paramiko.SFTPClient.from_transport(ssh.get_transport())
|
|
|
+ try:
|
|
|
+ # Check if the remote file already exists
|
|
|
+ sftp.stat(remotePath)
|
|
|
+ except IOError as ex:
|
|
|
+ if ex.errno != errno.ENOENT:
|
|
|
+ raise
|
|
|
+ sftp.put(localPath, tmp)
|
|
|
+ utils.exec_ssh_cmd(ssh, "sudo mv %s %s" % (tmp, remotePath))
|
|
|
+ finally:
|
|
|
+ sftp.close()
|
|
|
|
|
|
@utils.retry_on_error()
|
|
|
def _copy_replicator_cmd(self, ssh):
|
|
|
@@ -471,12 +472,12 @@ class Replicator(object):
|
|
|
utils.exec_ssh_cmd(
|
|
|
self._ssh, "sudo chmod +r /tmp/creds.zip")
|
|
|
|
|
|
- with self._lock:
|
|
|
- sftp = self._ssh.open_sftp()
|
|
|
- try:
|
|
|
- sftp.get("/tmp/creds.zip", localCertZip, callback=progressf)
|
|
|
- finally:
|
|
|
- sftp.close()
|
|
|
+ sftp = paramiko.SFTPClient.from_transport(
|
|
|
+ self._ssh.get_transport())
|
|
|
+ try:
|
|
|
+ sftp.get("/tmp/creds.zip", localCertZip, callback=progressf)
|
|
|
+ finally:
|
|
|
+ sftp.close()
|
|
|
|
|
|
zFile = zipfile.ZipFile(localCertZip)
|
|
|
zFile.extractall(path=self._cert_dir)
|