Sfoglia il codice sorgente

Use queueing for SSH backup writer

This should speed up the transfer speed when using the SSH backup
writer. Currently,there are 3 operations that happen:

1) Read from the source
2) Compress and encode payload
3) Send payload to destination

These 3 operations run in sequence for each chunk, which means that
while we are reading from the source, we are not compressing or
sending. While we are compressing and encoding, we are not reading
from the source or sending to destination, etc. Using queues allows
us to do more work in parallel.
Gabriel-Adrian Samfira 6 anni fa
parent
commit
e62be72680
2 ha cambiato i file con 91 aggiunte e 7 eliminazioni
  1. 1 0
      coriolis/data_transfer.py
  2. 90 7
      coriolis/providers/backup_writers.py

+ 1 - 0
coriolis/data_transfer.py

@@ -30,6 +30,7 @@ CONF = cfg.CONF
 CONF.register_opts(compressor_opts)
 
 LOG = logging.getLogger(__name__)
+
 _COMPRESS_FUNC = {
     constants.COMPRESSION_FORMAT_GZIP: gzip.compress,
     constants.COMPRESSION_FORMAT_ZLIB: zlib.compress,

+ 90 - 7
coriolis/providers/backup_writers.py

@@ -3,6 +3,7 @@
 
 import abc
 import contextlib
+import datetime
 import errno
 import os
 import tempfile
@@ -162,13 +163,22 @@ class FileBackupWriter(BaseBackupWriter):
 
 
 class SSHBackupWriterImpl(BaseBackupWriterImpl):
-    def __init__(self, path, disk_id, compress_transfer=None):
+    def __init__(self, path, disk_id, compress_transfer=None,
+                 encoder_count=3):
         self._msg_id = None
         self._stdin = None
         self._stdout = None
         self._stderr = None
         self._offset = None
         self._ssh = None
+        self._sender_q = eventlet.Queue(maxsize=5)
+        self._enc_q = eventlet.Queue(maxsize=5)
+        self._sender_evt = None
+        self._encoder_evt = []
+        self._encoder_cnt = encoder_count
+        self._exception = None
+        self._closing = False
+
         self._compress_transfer = compress_transfer
         if self._compress_transfer is None:
             self._compress_transfer = CONF.compress_transfers
@@ -184,10 +194,10 @@ class SSHBackupWriterImpl(BaseBackupWriterImpl):
         self._stdin, self._stdout, self._stderr = self._ssh.exec_command(
             "chmod +x write_data && sudo ./write_data")
 
-    def _encode_data(self, content):
+    def _encode_data(self, content, offset, msg_id):
         msg = data_transfer.encode_data(
-            self._msg_id, self._path,
-            self._offset, content,
+            msg_id, self._path,
+            offset, content,
             compress=self._compress_transfer)
 
         LOG.debug(
@@ -213,13 +223,18 @@ class SSHBackupWriterImpl(BaseBackupWriterImpl):
                 raise exception.CoriolisException(
                     "write_data exited with error code %r (%s)" % (
                         ret_val, _WRITER_ERR_MAP.get(int(ret_val))))
-        self._msg_id += 1
+
         self._stdin.write(data)
         self._stdin.flush()
         self._stdout.read(4)
 
     def _open(self):
         self._exec_helper_cmd()
+        self._sender_evt = eventlet.spawn(
+            self._sender)
+        for _ in range(self._encoder_cnt):
+            self._encoder_evt.append(
+                eventlet.spawn(self._encoder))
 
     def seek(self, pos):
         self._offset = pos
@@ -227,15 +242,83 @@ class SSHBackupWriterImpl(BaseBackupWriterImpl):
     def truncate(self, size):
         pass
 
+    def _sender(self):
+        while True:
+            data = self._sender_q.get()
+            try:
+                self._send_msg(data)
+            except Exception as err:
+                self._exception = err
+                raise
+            finally:
+                self._sender_q.task_done()
+
+    def _encoder(self):
+        while True:
+            payload = self._enc_q.get()
+            try:
+                data = self._encode_data(
+                    payload["data"],
+                    payload["offset"],
+                    payload["msg_id"])
+                self._sender_q.put(data)
+            except Exception as err:
+                self._exception = err
+                raise
+            finally:
+                self._enc_q.task_done()
+
     def write(self, data):
-        self._send_msg(self._encode_data(data))
+        if self._closing:
+            raise exception.CoriolisException(
+                "Attempted to write to a closed writer.")
+
+        if self._exception:
+            raise exception.CoriolisException(
+                    "Failed to write data. See log "
+                    "for details.") from self._exception
+
+        payload = {
+            "offset": self._offset,
+            "data": data,
+            "msg_id": self._msg_id,
+        }
+        self._enc_q.put(payload)
         self._offset += len(data)
+        self._msg_id += 1
+
+    def _wait_for_queues(self):
+        LOG.info("Waiting for unfinished transfers to complete")
+        timeout = datetime.datetime.now() + datetime.timedelta(seconds=600)
+        while (self._enc_q.unfinished_tasks or
+               self._sender_q.unfinished_tasks) and not self._exception:
+            time.sleep(0.5)
+            now = datetime.datetime.now()
+            if now >= timeout:
+                raise exception.CoriolisException(
+                    "Timed out waiting for data transfer to finish")
 
     def close(self):
+        self._closing = True
+        self._wait_for_queues()
+        if self._exception:
+            # We can raise here. Any SSH socket cleanup will happen
+            # in _handle_exception()
+            raise exception.CoriolisException(
+                "Exception occurred during data transfer. "
+                "Check logs for more details.") from self._exception
+
         if self._ssh:
             self._send_msg(self._encode_eod())
             self._ssh.close()
             self._ssh = None
+        if self._sender_evt:
+            eventlet.kill(self._sender_evt)
+            self._sender_evt = None
+
+        for i in self._encoder_evt:
+            eventlet.kill(i)
+        self._encoder_evt = []
 
     def _handle_exception(self, ex):
         super(SSHBackupWriterImpl, self)._handle_exception(ex)
@@ -416,7 +499,7 @@ class HTTPBackupWriterImpl(BaseBackupWriterImpl):
         if self._compressor_count is None or self._compressor_count == 0:
             self._compressor_count = 1
         self._compressor_evt = []
-        for i in range(self._compressor_count):
+        for _ in range(self._compressor_count):
             self._compressor_evt.append(
                 eventlet.spawn(self._compressor))