|
|
@@ -260,7 +260,8 @@ class SSHBackupWriter(BaseBackupWriter):
|
|
|
|
|
|
|
|
|
class HTTPBackupWriterImpl(BaseBackupWriterImpl):
|
|
|
- def __init__(self, path, disk_id, compress_transfer=None):
|
|
|
+ def __init__(self, path, disk_id,
|
|
|
+ compress_transfer=None, compressor_count=3):
|
|
|
self._offset = None
|
|
|
self._session = None
|
|
|
self._ip = None
|
|
|
@@ -272,6 +273,7 @@ class HTTPBackupWriterImpl(BaseBackupWriterImpl):
|
|
|
self._write_error = False
|
|
|
self._id = None
|
|
|
self._exception = None
|
|
|
+ self._compressor_count = compressor_count
|
|
|
self._comp_q = eventlet.Queue(maxsize=5)
|
|
|
self._sender_q = eventlet.Queue(maxsize=5)
|
|
|
|
|
|
@@ -297,8 +299,8 @@ class HTTPBackupWriterImpl(BaseBackupWriterImpl):
|
|
|
|
|
|
@property
|
|
|
def _uri(self):
|
|
|
- return "https://%s:%s/api/v1%s" % (
|
|
|
- self._ip, self._port, self._path
|
|
|
+ return "https://%s:%s/api/v1/%s" % (
|
|
|
+ self._ip, self._port, self._path.lstrip('/')
|
|
|
)
|
|
|
|
|
|
@utils.retry_on_error()
|
|
|
@@ -336,10 +338,12 @@ class HTTPBackupWriterImpl(BaseBackupWriterImpl):
|
|
|
self._init_session()
|
|
|
self._acquire()
|
|
|
self._sender_evt = eventlet.spawn(self._sender)
|
|
|
- self._compressor_evt = [
|
|
|
- eventlet.spawn(self._compressor),
|
|
|
- eventlet.spawn(self._compressor),
|
|
|
- eventlet.spawn(self._compressor)]
|
|
|
+ if self._compressor_count is None or self._compressor_count == 0:
|
|
|
+ self._compressor_count = 1
|
|
|
+ self._compressor_evt = []
|
|
|
+ for i in range(self._compressor_count):
|
|
|
+ self._compressor_evt.append(
|
|
|
+ eventlet.spawn(self._compressor))
|
|
|
|
|
|
def seek(self, pos):
|
|
|
self._offset = pos
|
|
|
@@ -467,7 +471,8 @@ class HTTPBackupWriterImpl(BaseBackupWriterImpl):
|
|
|
class HTTPBackupWriter(BaseBackupWriter):
|
|
|
|
|
|
def __init__(self, ip, port, username, pkey,
|
|
|
- password, writer_port, volumes_info, cert_dir):
|
|
|
+ password, writer_port, volumes_info,
|
|
|
+ cert_dir, compressor_count=3):
|
|
|
self._ip = ip
|
|
|
self._port = port
|
|
|
self._username = username
|
|
|
@@ -477,6 +482,7 @@ class HTTPBackupWriter(BaseBackupWriter):
|
|
|
self._writer_port = writer_port
|
|
|
self._lock = threading.Lock()
|
|
|
self._id = str(uuid.uuid4())
|
|
|
+ self._compressor_count = compressor_count
|
|
|
self._writer_cmd = os.path.join(
|
|
|
"/usr/bin", _CORIOLIS_HTTP_WRITER_CMD)
|
|
|
self._crt = None
|
|
|
@@ -507,7 +513,8 @@ class HTTPBackupWriter(BaseBackupWriter):
|
|
|
|
|
|
path = [v for v in self._volumes_info
|
|
|
if v["disk_id"] == disk_id][0]["volume_dev"]
|
|
|
- impl = HTTPBackupWriterImpl(path, disk_id)
|
|
|
+ impl = HTTPBackupWriterImpl(
|
|
|
+ path, disk_id, self._compressor_count)
|
|
|
impl._set_info({
|
|
|
"ip": self._ip,
|
|
|
"port": self._writer_port,
|