|
|
@@ -8,13 +8,13 @@ import copy
|
|
|
import datetime
|
|
|
import errno
|
|
|
import os
|
|
|
+import queue
|
|
|
import shutil
|
|
|
import tempfile
|
|
|
import threading
|
|
|
import time
|
|
|
import uuid
|
|
|
|
|
|
-import eventlet
|
|
|
from oslo_config import cfg
|
|
|
from oslo_log import log as logging
|
|
|
import paramiko
|
|
|
@@ -284,13 +284,18 @@ class SSHBackupWriterImpl(BaseBackupWriterImpl):
|
|
|
self._stderr = None
|
|
|
self._offset = None
|
|
|
self._ssh = None
|
|
|
- self._sender_q = eventlet.Queue(maxsize=5)
|
|
|
- self._enc_q = eventlet.Queue(maxsize=5)
|
|
|
- self._sender_evt = None
|
|
|
- self._encoder_evt = []
|
|
|
+ self._sender_q = queue.Queue(maxsize=5)
|
|
|
+ self._enc_q = queue.Queue(maxsize=5)
|
|
|
+ self._sender_thread = None
|
|
|
+ self._encoder_threads = []
|
|
|
self._encoder_cnt = encoder_count
|
|
|
self._exception = None
|
|
|
+ # Stop sequence:
|
|
|
+ # * once "_closing" is set, we no longer accept writes. We wait for
|
|
|
+ # the queues to be emptied and then we set "_stopped".
|
|
|
+ # * once "_stopped" is set, the worker loops will exit.
|
|
|
self._closing = False
|
|
|
+ self._stopped = False
|
|
|
|
|
|
self._compress_transfer = compress_transfer
|
|
|
if self._compress_transfer is None:
|
|
|
@@ -343,11 +348,10 @@ class SSHBackupWriterImpl(BaseBackupWriterImpl):
|
|
|
|
|
|
def _open(self):
|
|
|
self._exec_helper_cmd()
|
|
|
- self._sender_evt = eventlet.spawn(
|
|
|
- self._sender)
|
|
|
+ self._sender_thread = utils.start_thread(self._sender)
|
|
|
for _ in range(self._encoder_cnt):
|
|
|
- self._encoder_evt.append(
|
|
|
- eventlet.spawn(self._encoder))
|
|
|
+ self._encoder_threads.append(
|
|
|
+ utils.start_thread(self._encoder))
|
|
|
|
|
|
def seek(self, pos):
|
|
|
self._offset = pos
|
|
|
@@ -356,8 +360,11 @@ class SSHBackupWriterImpl(BaseBackupWriterImpl):
|
|
|
pass
|
|
|
|
|
|
def _sender(self):
|
|
|
- while True:
|
|
|
- data = self._sender_q.get()
|
|
|
+ while not self._stopped:
|
|
|
+ try:
|
|
|
+ data = self._sender_q.get(timeout=2)
|
|
|
+ except queue.Empty:
|
|
|
+ continue
|
|
|
try:
|
|
|
self._send_msg(data)
|
|
|
except BaseException as err:
|
|
|
@@ -368,8 +375,11 @@ class SSHBackupWriterImpl(BaseBackupWriterImpl):
|
|
|
del data
|
|
|
|
|
|
def _encoder(self):
|
|
|
- while True:
|
|
|
- payload = self._enc_q.get()
|
|
|
+ while not self._stopped:
|
|
|
+ try:
|
|
|
+ payload = self._enc_q.get(timeout=2)
|
|
|
+ except queue.Empty:
|
|
|
+ continue
|
|
|
try:
|
|
|
data = self._encode_data(
|
|
|
payload["data"],
|
|
|
@@ -415,6 +425,7 @@ class SSHBackupWriterImpl(BaseBackupWriterImpl):
|
|
|
def close(self):
|
|
|
self._closing = True
|
|
|
self._wait_for_queues()
|
|
|
+ self._stopped = True
|
|
|
if self._exception:
|
|
|
# We can raise here. Any SSH socket cleanup will happen
|
|
|
# in _handle_exception()
|
|
|
@@ -427,13 +438,15 @@ class SSHBackupWriterImpl(BaseBackupWriterImpl):
|
|
|
self._ssh.exec_command("sudo sync")
|
|
|
self._ssh.close()
|
|
|
self._ssh = None
|
|
|
- if self._sender_evt:
|
|
|
- eventlet.kill(self._sender_evt)
|
|
|
- self._sender_evt = None
|
|
|
+ if self._sender_thread:
|
|
|
+ LOG.debug("Joining sender thread.")
|
|
|
+ self._sender_thread.join()
|
|
|
+ self._sender_thread = None
|
|
|
|
|
|
- for i in self._encoder_evt:
|
|
|
- eventlet.kill(i)
|
|
|
- self._encoder_evt = []
|
|
|
+ for i in self._encoder_threads:
|
|
|
+ LOG.debug("Joining encoder thread.")
|
|
|
+ i.join()
|
|
|
+ self._encoder_threads = []
|
|
|
|
|
|
def _handle_exception(self, ex):
|
|
|
super(SSHBackupWriterImpl, self)._handle_exception(ex)
|
|
|
@@ -566,16 +579,21 @@ class HTTPBackupWriterImpl(BaseBackupWriterImpl):
|
|
|
self._crt = None
|
|
|
self._key = None
|
|
|
self._ca = None
|
|
|
+ # Stop sequence:
|
|
|
+ # * once "_closing" is set, we no longer accept writes. We wait for
|
|
|
+ # the queues to be emptied and then we set "_stopped".
|
|
|
+ # * once "_stopped" is set, the worker loops will exit.
|
|
|
self._closing = False
|
|
|
+ self._stopped = True
|
|
|
self._write_error = False
|
|
|
self._id = None
|
|
|
self._exception = None
|
|
|
self._compressor_count = compressor_count
|
|
|
- self._comp_q = eventlet.Queue(maxsize=5)
|
|
|
- self._sender_q = eventlet.Queue(maxsize=5)
|
|
|
+ self._comp_q = queue.Queue(maxsize=5)
|
|
|
+ self._sender_q = queue.Queue(maxsize=5)
|
|
|
|
|
|
- self._sender_evt = None
|
|
|
- self._compressor_evt = None
|
|
|
+ self._sender_thread = None
|
|
|
+ self._compressor_threads = None
|
|
|
|
|
|
self._compress_transfer = compress_transfer
|
|
|
if self._compress_transfer is None:
|
|
|
@@ -637,13 +655,13 @@ class HTTPBackupWriterImpl(BaseBackupWriterImpl):
|
|
|
self._closing = False
|
|
|
self._init_session()
|
|
|
self._acquire()
|
|
|
- self._sender_evt = eventlet.spawn(self._sender)
|
|
|
+ self._sender_thread = utils.start_thread(self._sender)
|
|
|
if self._compressor_count is None or self._compressor_count == 0:
|
|
|
self._compressor_count = 1
|
|
|
- self._compressor_evt = []
|
|
|
+ self._compressor_threads = []
|
|
|
for _ in range(self._compressor_count):
|
|
|
- self._compressor_evt.append(
|
|
|
- eventlet.spawn(self._compressor))
|
|
|
+ self._compressor_threads.append(
|
|
|
+ utils.start_thread(self._compressor))
|
|
|
|
|
|
def seek(self, pos):
|
|
|
self._offset = pos
|
|
|
@@ -660,8 +678,11 @@ class HTTPBackupWriterImpl(BaseBackupWriterImpl):
|
|
|
return
|
|
|
|
|
|
def _compressor(self):
|
|
|
- while True:
|
|
|
- payload = self._comp_q.get()
|
|
|
+ while not self._stopped:
|
|
|
+ try:
|
|
|
+ payload = self._comp_q.get(timeout=2)
|
|
|
+ except queue.Empty:
|
|
|
+ continue
|
|
|
send_payload = {
|
|
|
"encoding": None,
|
|
|
"offset": payload["offset"],
|
|
|
@@ -683,8 +704,12 @@ class HTTPBackupWriterImpl(BaseBackupWriterImpl):
|
|
|
self._comp_q.task_done()
|
|
|
|
|
|
def _sender(self):
|
|
|
- while True:
|
|
|
+ while not self._stopped:
|
|
|
payload = self._sender_q.get()
|
|
|
+ try:
|
|
|
+ payload = self._sender_q.get(timeout=2)
|
|
|
+ except queue.Empty:
|
|
|
+ continue
|
|
|
offset = copy.copy(payload["offset"])
|
|
|
headers = {
|
|
|
"X-Write-Offset": str(offset),
|
|
|
@@ -856,6 +881,7 @@ class HTTPBackupWriterImpl(BaseBackupWriterImpl):
|
|
|
def close(self):
|
|
|
self._closing = True
|
|
|
self._wait_for_queues()
|
|
|
+ self._stopped = True
|
|
|
if self._exception:
|
|
|
# There was an exception while writing. We still need to
|
|
|
# release the disk.
|
|
|
@@ -870,13 +896,15 @@ class HTTPBackupWriterImpl(BaseBackupWriterImpl):
|
|
|
if self._session:
|
|
|
self._session.close()
|
|
|
self._session = None
|
|
|
- if self._sender_evt:
|
|
|
- eventlet.kill(self._sender_evt)
|
|
|
- self._sender_evt = None
|
|
|
- if self._compressor_evt:
|
|
|
- for i in self._compressor_evt:
|
|
|
- eventlet.kill(i)
|
|
|
- self._compressor_evt = None
|
|
|
+ if self._sender_thread:
|
|
|
+ LOG.debug("Joining sender thread.")
|
|
|
+ self._sender_thread.join()
|
|
|
+ self._sender_thread = None
|
|
|
+ if self._compressor_threads:
|
|
|
+ for i in self._compressor_threads:
|
|
|
+ LOG.debug("Joining compressor thread.")
|
|
|
+ i.join()
|
|
|
+ self._compressor_threads = None
|
|
|
|
|
|
|
|
|
class HTTPBackupWriterBootstrapper(object):
|