2
0
Эх сурвалжийг харах

Add compressor support

This adds the option to use an external, multi-threaded service
for compression.
Gabriel Adrian Samfira 6 жил өмнө
parent
commit
4d5a7fe56e

+ 8 - 0
coriolis/constants.py

@@ -104,3 +104,11 @@ OS_TYPE_UNKNOWN = "unknown"
 DEFAULT_OS_TYPE = OS_TYPE_LINUX
 
 TMP_DIRS_KEY = "__tmp_dirs"
+
+COMPRESSION_FORMAT_GZIP = "gzip"
+COMPRESSION_FORMAT_ZLIB = "zlib"
+
+VALID_COMPRESSION_FORMATS = [
+    COMPRESSION_FORMAT_GZIP,
+    COMPRESSION_FORMAT_ZLIB
+]

+ 92 - 11
coriolis/data_transfer.py

@@ -1,12 +1,100 @@
 # Copyright 2016 Cloudbase Solutions Srl
 # All Rights Reserved.
 
+import gzip
+import os
+import stat
 import struct
 import zlib
 
+import requests
+import requests_unixsocket
+
+from urllib import parse
+from oslo_config import cfg
 from oslo_log import log as logging
 
+from coriolis import constants
+from coriolis import exception
+
+compressor_opts = [
+    cfg.StrOpt('compressor_address',
+               default=None,
+               help='Compressor address. If set, all gzip/zlib compression '
+                    'will be done through this service. This value can be '
+                    'either a unix socket path (/var/run/compressor.sock '
+                    'or an IP:PORT.'),
+]
+
+CONF = cfg.CONF
+CONF.register_opts(compressor_opts)
+
 LOG = logging.getLogger(__name__)
+_COMPRESS_FUNC = {
+    constants.COMPRESSION_FORMAT_GZIP: gzip.compress,
+    constants.COMPRESSION_FORMAT_ZLIB: zlib.compress,
+}
+
+
+def _get_session_and_address():
+    if not CONF.compressor_address:
+        return None, None
+
+    if CONF.compressor_address.startswith("/"):
+        # unix socket
+        if os.path.exists(CONF.compressor_address):
+            mode = os.stat(CONF.compressor_address).st_mode
+            if stat.S_ISSOCK(mode):
+                return (requests_unixsocket.Session(),
+                        "http+unix://%s/" % parse.quote_plus(
+                            CONF.compressor_address))
+            else:
+                raise exception.CoriolisException(
+                    "compressor_address is not a valid unix socket")
+        else:
+            raise exception.CoriolisException(
+                "compressor_address is not a valid unix socket")
+    return (requests.Session(), "http://%s/" % CONF.compressor_address)
+
+
+def compression_proxy(content, fmt):
+    if fmt not in constants.VALID_COMPRESSION_FORMATS:
+        raise exception.CoriolisException(
+            "Invalid compression format requested: %s" % fmt)
+    data = content
+    sess, url = _get_session_and_address()
+    if None in (sess, url):
+        compressed_data = _COMPRESS_FUNC[fmt](data)
+    else:
+        try:
+            headers = {
+                "X-Compression-Format": fmt,
+            }
+            ret = sess.post(url, data=data, headers=headers)
+            ret.raise_for_status()
+            compressed_data = ret.content
+        except Exception as err:
+            LOG.error(
+                "failed to compress using coriolis-compressor: %s" % err)
+            LOG.info("falling back to built-in compressor")
+            compressed_data = _COMPRESS_FUNC[fmt](content)
+        finally:
+            sess.close()
+
+    data_len = len(compressed_data)
+    data_len_inflated = len(data)
+    compression_saving = 100.0 * (1 - float(data_len) / data_len_inflated)
+    LOG.debug("Compression space saving: {:.02f}%".format(
+        compression_saving))
+
+    if data_len >= data_len_inflated:
+        # No advantage in sending the compressed data
+        LOG.debug("Ignoring compression, not worth")
+        compress = False
+    else:
+        data = compressed_data
+        compress = True
+    return data, compress
 
 
 def encode_data(msg_id, path, offset, content, compress=True):
@@ -16,20 +104,13 @@ def encode_data(msg_id, path, offset, content, compress=True):
 
     data_len_inflated = len(inflated_content)
 
+    compressed = False
     if compress:
-        data_content = zlib.compress(inflated_content)
+        data_content, compressed = compression_proxy(
+            inflated_content, constants.COMPRESSION_FORMAT_ZLIB)
         data_len = len(data_content)
 
-        compression_saving = 100.0 * (1 - float(data_len) / data_len_inflated)
-        LOG.debug("Compression space saving: {:.02f}%".format(
-            compression_saving))
-
-        if data_len >= data_len_inflated:
-            # No advantage in sending the compressed data
-            LOG.debug("Ignoring compression, not worth")
-            compress = False
-
-    if not compress:
+    if not compressed:
         data_len = data_len_inflated
         data_len_inflated = 0
         data_content = inflated_content

+ 1 - 0
requirements.txt

@@ -34,3 +34,4 @@ strict-rfc3339
 sqlalchemy
 webob
 sshtunnel
+requests-unixsocket