Kaynağa Gözat

Merge pull request #104 from gabriel-samfira/add-ssh-writer-queueing

Use queueing for SSH backup writer
Nashwan Azhari 6 yıl önce
ebeveyn
işleme
97c1e83d11

+ 1 - 0
coriolis/data_transfer.py

@@ -30,6 +30,7 @@ CONF = cfg.CONF
 CONF.register_opts(compressor_opts)
 
 LOG = logging.getLogger(__name__)
+
 _COMPRESS_FUNC = {
     constants.COMPRESSION_FORMAT_GZIP: gzip.compress,
     constants.COMPRESSION_FORMAT_ZLIB: zlib.compress,

+ 90 - 7
coriolis/providers/backup_writers.py

@@ -3,6 +3,7 @@
 
 import abc
 import contextlib
+import datetime
 import errno
 import os
 import tempfile
@@ -162,13 +163,22 @@ class FileBackupWriter(BaseBackupWriter):
 
 
 class SSHBackupWriterImpl(BaseBackupWriterImpl):
-    def __init__(self, path, disk_id, compress_transfer=None):
+    def __init__(self, path, disk_id, compress_transfer=None,
+                 encoder_count=3):
         self._msg_id = None
         self._stdin = None
         self._stdout = None
         self._stderr = None
         self._offset = None
         self._ssh = None
+        self._sender_q = eventlet.Queue(maxsize=5)
+        self._enc_q = eventlet.Queue(maxsize=5)
+        self._sender_evt = None
+        self._encoder_evt = []
+        self._encoder_cnt = encoder_count
+        self._exception = None
+        self._closing = False
+
         self._compress_transfer = compress_transfer
         if self._compress_transfer is None:
             self._compress_transfer = CONF.compress_transfers
@@ -184,10 +194,10 @@ class SSHBackupWriterImpl(BaseBackupWriterImpl):
         self._stdin, self._stdout, self._stderr = self._ssh.exec_command(
             "chmod +x write_data && sudo ./write_data")
 
-    def _encode_data(self, content):
+    def _encode_data(self, content, offset, msg_id):
         msg = data_transfer.encode_data(
-            self._msg_id, self._path,
-            self._offset, content,
+            msg_id, self._path,
+            offset, content,
             compress=self._compress_transfer)
 
         LOG.debug(
@@ -213,13 +223,18 @@ class SSHBackupWriterImpl(BaseBackupWriterImpl):
                 raise exception.CoriolisException(
                     "write_data exited with error code %r (%s)" % (
                         ret_val, _WRITER_ERR_MAP.get(int(ret_val))))
-        self._msg_id += 1
+
         self._stdin.write(data)
         self._stdin.flush()
         self._stdout.read(4)
 
     def _open(self):
         self._exec_helper_cmd()
+        self._sender_evt = eventlet.spawn(
+            self._sender)
+        for _ in range(self._encoder_cnt):
+            self._encoder_evt.append(
+                eventlet.spawn(self._encoder))
 
     def seek(self, pos):
         self._offset = pos
@@ -227,15 +242,83 @@ class SSHBackupWriterImpl(BaseBackupWriterImpl):
     def truncate(self, size):
         pass
 
+    def _sender(self):
+        while True:
+            data = self._sender_q.get()
+            try:
+                self._send_msg(data)
+            except Exception as err:
+                self._exception = err
+                raise
+            finally:
+                self._sender_q.task_done()
+
+    def _encoder(self):
+        while True:
+            payload = self._enc_q.get()
+            try:
+                data = self._encode_data(
+                    payload["data"],
+                    payload["offset"],
+                    payload["msg_id"])
+                self._sender_q.put(data)
+            except Exception as err:
+                self._exception = err
+                raise
+            finally:
+                self._enc_q.task_done()
+
     def write(self, data):
-        self._send_msg(self._encode_data(data))
+        if self._closing:
+            raise exception.CoriolisException(
+                "Attempted to write to a closed writer.")
+
+        if self._exception:
+            raise exception.CoriolisException(
+                    "Failed to write data. See log "
+                    "for details.") from self._exception
+
+        payload = {
+            "offset": self._offset,
+            "data": data,
+            "msg_id": self._msg_id,
+        }
+        self._enc_q.put(payload)
         self._offset += len(data)
+        self._msg_id += 1
+
+    def _wait_for_queues(self):
+        LOG.info("Waiting for unfinished transfers to complete")
+        timeout = datetime.datetime.now() + datetime.timedelta(seconds=600)
+        while (self._enc_q.unfinished_tasks or
+               self._sender_q.unfinished_tasks) and not self._exception:
+            time.sleep(0.5)
+            now = datetime.datetime.now()
+            if now >= timeout:
+                raise exception.CoriolisException(
+                    "Timed out waiting for data transfer to finish")
 
     def close(self):
+        self._closing = True
+        self._wait_for_queues()
+        if self._exception:
+            # We can raise here. Any SSH socket cleanup will happen
+            # in _handle_exception()
+            raise exception.CoriolisException(
+                "Exception occurred during data transfer. "
+                "Check logs for more details.") from self._exception
+
         if self._ssh:
             self._send_msg(self._encode_eod())
             self._ssh.close()
             self._ssh = None
+        if self._sender_evt:
+            eventlet.kill(self._sender_evt)
+            self._sender_evt = None
+
+        for i in self._encoder_evt:
+            eventlet.kill(i)
+        self._encoder_evt = []
 
     def _handle_exception(self, ex):
         super(SSHBackupWriterImpl, self)._handle_exception(ex)
@@ -416,7 +499,7 @@ class HTTPBackupWriterImpl(BaseBackupWriterImpl):
         if self._compressor_count is None or self._compressor_count == 0:
             self._compressor_count = 1
         self._compressor_evt = []
-        for i in range(self._compressor_count):
+        for _ in range(self._compressor_count):
             self._compressor_evt.append(
                 eventlet.spawn(self._compressor))