data_transfer.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. # Copyright 2016 Cloudbase Solutions Srl
  2. # All Rights Reserved.
  3. import gzip
  4. import os
  5. import stat
  6. import struct
  7. import zlib
  8. import requests
  9. import requests_unixsocket
  10. from urllib import parse
  11. from oslo_config import cfg
  12. from oslo_log import log as logging
  13. from coriolis import constants
  14. from coriolis import exception
  15. compressor_opts = [
  16. cfg.StrOpt('compressor_address',
  17. default=None,
  18. help='Compressor address. If set, all gzip/zlib compression '
  19. 'will be done through this service. This value can be '
  20. 'either a unix socket path (/var/run/compressor.sock '
  21. 'or an IP:PORT.'),
  22. ]
  23. CONF = cfg.CONF
  24. CONF.register_opts(compressor_opts)
  25. LOG = logging.getLogger(__name__)
  26. _COMPRESS_FUNC = {
  27. constants.COMPRESSION_FORMAT_GZIP: gzip.compress,
  28. constants.COMPRESSION_FORMAT_ZLIB: zlib.compress,
  29. }
  30. def _get_session_and_address():
  31. if not CONF.compressor_address:
  32. return None, None
  33. if CONF.compressor_address.startswith("/"):
  34. # unix socket
  35. if os.path.exists(CONF.compressor_address):
  36. mode = os.stat(CONF.compressor_address).st_mode
  37. if stat.S_ISSOCK(mode):
  38. return (requests_unixsocket.Session(),
  39. "http+unix://%s/" % parse.quote_plus(
  40. CONF.compressor_address))
  41. else:
  42. raise exception.CoriolisException(
  43. "compressor_address is not a valid unix socket")
  44. else:
  45. raise exception.CoriolisException(
  46. "compressor_address is not a valid unix socket")
  47. return (requests.Session(), "http://%s/" % CONF.compressor_address)
  48. def compression_proxy(content, fmt):
  49. if fmt not in constants.VALID_COMPRESSION_FORMATS:
  50. raise exception.CoriolisException(
  51. "Invalid compression format requested: %s" % fmt)
  52. data = content
  53. sess, url = _get_session_and_address()
  54. if None in (sess, url):
  55. compressed_data = _COMPRESS_FUNC[fmt](data)
  56. else:
  57. try:
  58. headers = {
  59. "X-Compression-Format": fmt,
  60. }
  61. ret = sess.post(url, data=data, headers=headers)
  62. ret.raise_for_status()
  63. compressed_data = ret.content
  64. except Exception as err:
  65. LOG.exception(
  66. "failed to compress using coriolis-compressor: %s" % err)
  67. LOG.info("falling back to built-in compressor")
  68. compressed_data = _COMPRESS_FUNC[fmt](content)
  69. finally:
  70. sess.close()
  71. data_len = len(compressed_data)
  72. data_len_inflated = len(data)
  73. compression_saving = 100.0 * (1 - float(data_len) / data_len_inflated)
  74. LOG.debug("Compression space saving: {:.02f}%".format(
  75. compression_saving))
  76. if data_len >= data_len_inflated:
  77. # No advantage in sending the compressed data
  78. compress = False
  79. else:
  80. data = compressed_data
  81. compress = True
  82. return data, compress
  83. def encode_data(msg_id, path, offset, content, compress=True):
  84. inflated_content = (path.encode() + b'\0' +
  85. struct.pack("<Q", offset) +
  86. content)
  87. data_len_inflated = len(inflated_content)
  88. compressed = False
  89. if compress:
  90. data_content, compressed = compression_proxy(
  91. inflated_content, constants.COMPRESSION_FORMAT_ZLIB)
  92. data_len = len(data_content)
  93. if not compressed:
  94. data_len = data_len_inflated
  95. data_len_inflated = 0
  96. data_content = inflated_content
  97. return (struct.pack("<I", msg_id) +
  98. struct.pack("<I", data_len) +
  99. struct.pack("<I", data_len_inflated) +
  100. data_content)
  101. def encode_eod(msg_id):
  102. return struct.pack("<I", msg_id) + struct.pack("<I", 0)