Просмотр исходного кода

Use queueing for SSH backup writer

This should speed up the transfer speed when using the SSH backup
writer. Currently,there are 3 operations that happen:

1) Read from the source
2) Compress and encode payload
3) Send payload to destination

These 3 operations run in sequence for each chunk, which means that
while we are reading from the source, we are not compressing or
sending. While we are compressing and encoding, we are not reading
from the source or sending to destination, etc. Using queues allows
us to do more work in parallel.
Gabriel-Adrian Samfira 6 лет назад
Родитель
Сommit
7af5f63dab
2 измененных файлов с 91 добавлено и 7 удалено
  1. 1 0
      coriolis/data_transfer.py
  2. 90 7
      coriolis/providers/backup_writers.py

+ 1 - 0
coriolis/data_transfer.py

@@ -30,6 +30,7 @@ CONF = cfg.CONF
 CONF.register_opts(compressor_opts)
 CONF.register_opts(compressor_opts)
 
 
 LOG = logging.getLogger(__name__)
 LOG = logging.getLogger(__name__)
+
 _COMPRESS_FUNC = {
 _COMPRESS_FUNC = {
     constants.COMPRESSION_FORMAT_GZIP: gzip.compress,
     constants.COMPRESSION_FORMAT_GZIP: gzip.compress,
     constants.COMPRESSION_FORMAT_ZLIB: zlib.compress,
     constants.COMPRESSION_FORMAT_ZLIB: zlib.compress,

+ 90 - 7
coriolis/providers/backup_writers.py

@@ -3,6 +3,7 @@
 
 
 import abc
 import abc
 import contextlib
 import contextlib
+import datetime
 import errno
 import errno
 import os
 import os
 import tempfile
 import tempfile
@@ -162,13 +163,22 @@ class FileBackupWriter(BaseBackupWriter):
 
 
 
 
 class SSHBackupWriterImpl(BaseBackupWriterImpl):
 class SSHBackupWriterImpl(BaseBackupWriterImpl):
-    def __init__(self, path, disk_id, compress_transfer=None):
+    def __init__(self, path, disk_id, compress_transfer=None,
+                 encoder_count=3):
         self._msg_id = None
         self._msg_id = None
         self._stdin = None
         self._stdin = None
         self._stdout = None
         self._stdout = None
         self._stderr = None
         self._stderr = None
         self._offset = None
         self._offset = None
         self._ssh = None
         self._ssh = None
+        self._sender_q = eventlet.Queue(maxsize=5)
+        self._enc_q = eventlet.Queue(maxsize=5)
+        self._sender_evt = None
+        self._encoder_evt = []
+        self._encoder_cnt = encoder_count
+        self._exception = None
+        self._closing = False
+
         self._compress_transfer = compress_transfer
         self._compress_transfer = compress_transfer
         if self._compress_transfer is None:
         if self._compress_transfer is None:
             self._compress_transfer = CONF.compress_transfers
             self._compress_transfer = CONF.compress_transfers
@@ -184,10 +194,10 @@ class SSHBackupWriterImpl(BaseBackupWriterImpl):
         self._stdin, self._stdout, self._stderr = self._ssh.exec_command(
         self._stdin, self._stdout, self._stderr = self._ssh.exec_command(
             "chmod +x write_data && sudo ./write_data")
             "chmod +x write_data && sudo ./write_data")
 
 
-    def _encode_data(self, content):
+    def _encode_data(self, content, offset, msg_id):
         msg = data_transfer.encode_data(
         msg = data_transfer.encode_data(
-            self._msg_id, self._path,
-            self._offset, content,
+            msg_id, self._path,
+            offset, content,
             compress=self._compress_transfer)
             compress=self._compress_transfer)
 
 
         LOG.debug(
         LOG.debug(
@@ -213,13 +223,18 @@ class SSHBackupWriterImpl(BaseBackupWriterImpl):
                 raise exception.CoriolisException(
                 raise exception.CoriolisException(
                     "write_data exited with error code %r (%s)" % (
                     "write_data exited with error code %r (%s)" % (
                         ret_val, _WRITER_ERR_MAP.get(int(ret_val))))
                         ret_val, _WRITER_ERR_MAP.get(int(ret_val))))
-        self._msg_id += 1
+
         self._stdin.write(data)
         self._stdin.write(data)
         self._stdin.flush()
         self._stdin.flush()
         self._stdout.read(4)
         self._stdout.read(4)
 
 
     def _open(self):
     def _open(self):
         self._exec_helper_cmd()
         self._exec_helper_cmd()
+        self._sender_evt = eventlet.spawn(
+            self._sender)
+        for _ in range(self._encoder_cnt):
+            self._encoder_evt.append(
+                eventlet.spawn(self._encoder))
 
 
     def seek(self, pos):
     def seek(self, pos):
         self._offset = pos
         self._offset = pos
@@ -227,15 +242,83 @@ class SSHBackupWriterImpl(BaseBackupWriterImpl):
     def truncate(self, size):
     def truncate(self, size):
         pass
         pass
 
 
+    def _sender(self):
+        while True:
+            data = self._sender_q.get()
+            try:
+                self._send_msg(data)
+            except Exception as err:
+                self._exception = err
+                raise
+            finally:
+                self._sender_q.task_done()
+
+    def _encoder(self):
+        while True:
+            payload = self._enc_q.get()
+            try:
+                data = self._encode_data(
+                    payload["data"],
+                    payload["offset"],
+                    payload["msg_id"])
+                self._sender_q.put(data)
+            except Exception as err:
+                self._exception = err
+                raise
+            finally:
+                self._enc_q.task_done()
+
     def write(self, data):
     def write(self, data):
-        self._send_msg(self._encode_data(data))
+        if self._closing:
+            raise exception.CoriolisException(
+                "Attempted to write to a closed writer.")
+
+        if self._exception:
+            raise exception.CoriolisException(
+                    "Failed to write data. See log "
+                    "for details.") from self._exception
+
+        payload = {
+            "offset": self._offset,
+            "data": data,
+            "msg_id": self._msg_id,
+        }
+        self._enc_q.put(payload)
         self._offset += len(data)
         self._offset += len(data)
+        self._msg_id += 1
+
+    def _wait_for_queues(self):
+        LOG.info("Waiting for unfinished transfers to complete")
+        timeout = datetime.datetime.now() + datetime.timedelta(seconds=600)
+        while (self._enc_q.unfinished_tasks or
+               self._sender_q.unfinished_tasks) and not self._exception:
+            time.sleep(0.5)
+            now = datetime.datetime.now()
+            if now >= timeout:
+                raise exception.CoriolisException(
+                    "Timed out waiting for data transfer to finish")
 
 
     def close(self):
     def close(self):
+        self._closing = True
+        self._wait_for_queues()
+        if self._exception:
+            # We can raise here. Any SSH socket cleanup will happen
+            # in _handle_exception()
+            raise exception.CoriolisException(
+                "Exception occurred during data transfer. "
+                "Check logs for more details.") from self._exception
+
         if self._ssh:
         if self._ssh:
             self._send_msg(self._encode_eod())
             self._send_msg(self._encode_eod())
             self._ssh.close()
             self._ssh.close()
             self._ssh = None
             self._ssh = None
+        if self._sender_evt:
+            eventlet.kill(self._sender_evt)
+            self._sender_evt = None
+
+        for i in self._encoder_evt:
+            eventlet.kill(i)
+        self._encoder_evt = []
 
 
     def _handle_exception(self, ex):
     def _handle_exception(self, ex):
         super(SSHBackupWriterImpl, self)._handle_exception(ex)
         super(SSHBackupWriterImpl, self)._handle_exception(ex)
@@ -416,7 +499,7 @@ class HTTPBackupWriterImpl(BaseBackupWriterImpl):
         if self._compressor_count is None or self._compressor_count == 0:
         if self._compressor_count is None or self._compressor_count == 0:
             self._compressor_count = 1
             self._compressor_count = 1
         self._compressor_evt = []
         self._compressor_evt = []
-        for i in range(self._compressor_count):
+        for _ in range(self._compressor_count):
             self._compressor_evt.append(
             self._compressor_evt.append(
                 eventlet.spawn(self._compressor))
                 eventlet.spawn(self._compressor))