|
|
@@ -360,25 +360,31 @@ class SSHBackupWriterImpl(BaseBackupWriterImpl):
|
|
|
pass
|
|
|
|
|
|
def _sender(self):
|
|
|
+ LOG.debug("Backup sender started.")
|
|
|
while not self._stopped:
|
|
|
try:
|
|
|
data = self._sender_q.get(timeout=2)
|
|
|
except queue.Empty:
|
|
|
+ LOG.debug("Backup sender queue empty.")
|
|
|
continue
|
|
|
try:
|
|
|
self._send_msg(data)
|
|
|
except BaseException as err:
|
|
|
+ LOG.error("Backup sender failed.")
|
|
|
self._exception = err
|
|
|
raise
|
|
|
finally:
|
|
|
self._sender_q.task_done()
|
|
|
del data
|
|
|
+ LOG.debug("Backup sender stopped.")
|
|
|
|
|
|
def _encoder(self):
|
|
|
+ LOG.debug("Backup encoder started.")
|
|
|
while not self._stopped:
|
|
|
try:
|
|
|
payload = self._enc_q.get(timeout=2)
|
|
|
except queue.Empty:
|
|
|
+ LOG.debug("Backup encoder queue empty.")
|
|
|
continue
|
|
|
try:
|
|
|
data = self._encode_data(
|
|
|
@@ -387,10 +393,12 @@ class SSHBackupWriterImpl(BaseBackupWriterImpl):
|
|
|
payload["msg_id"])
|
|
|
self._sender_q.put(data)
|
|
|
except BaseException as err:
|
|
|
+ LOG.error("Backup encoder failed.")
|
|
|
self._exception = err
|
|
|
raise
|
|
|
finally:
|
|
|
self._enc_q.task_done()
|
|
|
+ LOG.debug("Backup encoder stopped.")
|
|
|
|
|
|
def write(self, data):
|
|
|
if self._closing:
|
|
|
@@ -416,6 +424,9 @@ class SSHBackupWriterImpl(BaseBackupWriterImpl):
|
|
|
timeout = datetime.datetime.now() + datetime.timedelta(seconds=600)
|
|
|
while (self._enc_q.unfinished_tasks or
|
|
|
self._sender_q.unfinished_tasks) and not self._exception:
|
|
|
+ LOG.info("Waiting for unfinished transfers to complete, "
|
|
|
+ f"encoder tasks: {self._enc_q.unfinished_tasks}, "
|
|
|
+ f"sender tasks: {self._sender_q.unfinished_tasks}.")
|
|
|
time.sleep(0.5)
|
|
|
now = datetime.datetime.now()
|
|
|
if now >= timeout:
|
|
|
@@ -584,7 +595,7 @@ class HTTPBackupWriterImpl(BaseBackupWriterImpl):
|
|
|
# the queues to be emptied and then we set "_stopped".
|
|
|
# * once "_stopped" is set, the worker loops will exit.
|
|
|
self._closing = False
|
|
|
- self._stopped = True
|
|
|
+ self._stopped = False
|
|
|
self._write_error = False
|
|
|
self._id = None
|
|
|
self._exception = None
|
|
|
@@ -678,10 +689,12 @@ class HTTPBackupWriterImpl(BaseBackupWriterImpl):
|
|
|
return
|
|
|
|
|
|
def _compressor(self):
|
|
|
+ LOG.debug("Backup comperssor started.")
|
|
|
while not self._stopped:
|
|
|
try:
|
|
|
payload = self._comp_q.get(timeout=2)
|
|
|
except queue.Empty:
|
|
|
+ LOG.debug("Backup compressor queue empty.")
|
|
|
continue
|
|
|
send_payload = {
|
|
|
"encoding": None,
|
|
|
@@ -695,20 +708,22 @@ class HTTPBackupWriterImpl(BaseBackupWriterImpl):
|
|
|
if compressed:
|
|
|
send_payload["encoding"] = 'gzip'
|
|
|
except BaseException as err:
|
|
|
- LOG.exception(err)
|
|
|
+ LOG.exception("Backup compressor failure.")
|
|
|
self._exception = err
|
|
|
self._comp_q.task_done()
|
|
|
raise
|
|
|
send_payload["chunk"] = chunk
|
|
|
self._sender_q.put(send_payload)
|
|
|
self._comp_q.task_done()
|
|
|
+ LOG.debug("Backup compressor stopped.")
|
|
|
|
|
|
def _sender(self):
|
|
|
+ LOG.debug("Backup sender started.")
|
|
|
while not self._stopped:
|
|
|
- payload = self._sender_q.get()
|
|
|
try:
|
|
|
payload = self._sender_q.get(timeout=2)
|
|
|
except queue.Empty:
|
|
|
+ LOG.debug("Backup sender queue empty.")
|
|
|
continue
|
|
|
offset = copy.copy(payload["offset"])
|
|
|
headers = {
|
|
|
@@ -749,7 +764,7 @@ class HTTPBackupWriterImpl(BaseBackupWriterImpl):
|
|
|
except BaseException as err:
|
|
|
# record the exception. We need to terminate
|
|
|
# the writer if this is set
|
|
|
- LOG.exception(err)
|
|
|
+ LOG.exception("Backup sender failed.")
|
|
|
self._exception = err
|
|
|
self._sender_q.task_done()
|
|
|
raise
|
|
|
@@ -757,6 +772,7 @@ class HTTPBackupWriterImpl(BaseBackupWriterImpl):
|
|
|
del headers
|
|
|
del payload
|
|
|
self._sender_q.task_done()
|
|
|
+ LOG.debug("Backup sender stopped.")
|
|
|
|
|
|
@utils.retry_on_error()
|
|
|
def write(self, data):
|
|
|
@@ -775,11 +791,18 @@ class HTTPBackupWriterImpl(BaseBackupWriterImpl):
|
|
|
self._offset += len(data)
|
|
|
|
|
|
def _wait_for_queues(self):
|
|
|
+ timeout = datetime.datetime.now() + datetime.timedelta(seconds=600)
|
|
|
while (self._comp_q.unfinished_tasks or
|
|
|
self._sender_q.unfinished_tasks) and not self._exception:
|
|
|
# No error recorded, and we have tasks in the queue
|
|
|
- LOG.info("Waiting for unfinished transfers to complete")
|
|
|
+ LOG.info("Waiting for unfinished transfers to complete, "
|
|
|
+ f"compressor tasks: {self._comp_q.unfinished_tasks}, "
|
|
|
+ f"sender tasks: {self._sender_q.unfinished_tasks}.")
|
|
|
time.sleep(0.5)
|
|
|
+ now = datetime.datetime.now()
|
|
|
+ if now >= timeout:
|
|
|
+ raise exception.CoriolisException(
|
|
|
+ "Timed out waiting for data transfer to finish")
|
|
|
|
|
|
def _create_checksum_job(self, algorithm, start_offset=0, end_offset=0):
|
|
|
"""Creates a full-disk checksum job on the writer.
|