Browse Source

Adds replica compressed data transfer

Alessandro Pilotti 9 years ago
parent
commit
ee400679c9
4 changed files with 121 additions and 24 deletions
  1. 44 0
      coriolis/data_transfer.py
  2. 12 10
      coriolis/providers/vmware_vsphere/__init__.py
  3. 1 1
      resources/makefile
  4. 64 13
      resources/write_data.c

+ 44 - 0
coriolis/data_transfer.py

@@ -0,0 +1,44 @@
+# Copyright 2016 Cloudbase Solutions Srl
+# All Rights Reserved.
+
+import struct
+import zlib
+
+from oslo_log import log as logging
+
+LOG = logging.getLogger(__name__)
+
+
+def encode_data(msg_id, path, offset, content, compress=True):
+    inflated_content = (path.encode() + b'\0' +
+                        struct.pack("<Q", offset) +
+                        content)
+
+    data_len_inflated = len(inflated_content)
+
+    if compress:
+        data_content = zlib.compress(inflated_content)
+        data_len = len(data_content)
+
+        compression_saving = 100.0 * (1 - float(data_len) / data_len_inflated)
+        LOG.debug("Compression space saving: {:.02f}%".format(
+            compression_saving))
+
+        if data_len >= data_len_inflated:
+            # No advantage in sending the compressed data
+            LOG.debug("Ignoring compression, not worth")
+            compress = False
+
+    if not compress:
+        data_len = data_len_inflated
+        data_len_inflated = 0
+        data_content = inflated_content
+
+    return (struct.pack("<I", msg_id) +
+            struct.pack("<I", data_len) +
+            struct.pack("<I", data_len_inflated) +
+            data_content)
+
+
+def encode_eod(msg_id):
+    return struct.pack("<I", msg_id) + struct.pack("<I", 0)

+ 12 - 10
coriolis/providers/vmware_vsphere/__init__.py

@@ -21,6 +21,7 @@ from pyVim import connect
 from pyVmomi import vim
 from pyVmomi import vim
 
 
 from coriolis import constants
 from coriolis import constants
+from coriolis import data_transfer
 from coriolis import exception
 from coriolis import exception
 from coriolis.providers import base
 from coriolis.providers import base
 from coriolis.providers.vmware_vsphere import guestid
 from coriolis.providers.vmware_vsphere import guestid
@@ -148,19 +149,20 @@ class _SSHBackupWriter(_BaseBackupWriter):
         path = [v for v in self._volumes_info
         path = [v for v in self._volumes_info
                 if v["disk_id"] == self._disk_id][0]["volume_dev"]
                 if v["disk_id"] == self._disk_id][0]["volume_dev"]
 
 
-        LOG.info("Guest path: %s", path)
-        LOG.info("Offset: %s", self._offset)
-        LOG.info("Content len: %s", len(content))
+        msg = data_transfer.encode_data(
+            self._msg_id, path, self._offset, content)
 
 
-        data_len = len(path) + 1 + 8 + len(content)
-        return (struct.pack("<I", self._msg_id) +
-                struct.pack("<I", data_len) +
-                path.encode() + b'\0' +
-                struct.pack("<Q", self._offset) +
-                content)
+        LOG.debug(
+            "Guest path: %(path)s, offset: %(offset)d, content len: "
+            "%(content_len)d, msg len: %(msg_len)d",
+            {"path": path, "offset": self._offset, "content_len": len(content),
+             "msg_len": len(msg)})
+        return msg
 
 
     def _encode_eod(self):
     def _encode_eod(self):
-        return struct.pack("<I", self._msg_id) + struct.pack("<I", 0)
+        msg = data_transfer.encode_eod(self._msg_id)
+        LOG.debug("EOD message len: %d", len(msg))
+        return msg
 
 
     @utils.retry_on_error()
     @utils.retry_on_error()
     def _send_msg(self, data):
     def _send_msg(self, data):

+ 1 - 1
resources/makefile

@@ -1,3 +1,3 @@
 write_data: write_data.c
 write_data: write_data.c
-	gcc -o write_data write_data.c
+	gcc -o write_data write_data.c -lz
 
 

+ 64 - 13
resources/write_data.c

@@ -5,23 +5,26 @@
 #include <stdint.h>
 #include <stdint.h>
 #include <stdlib.h>
 #include <stdlib.h>
 #include <string.h>
 #include <string.h>
+#include <zlib.h>
 
 
 #define MIN_MSG_SIZE (sizeof(uint64_t) + 1)
 #define MIN_MSG_SIZE (sizeof(uint64_t) + 1)
 #define MAX_MSG_SIZE (100 * 1024 * 1024)
 #define MAX_MSG_SIZE (100 * 1024 * 1024)
 
 
-#define ERR_MORE_MSG        -1
-#define ERR_DONE            0
-#define ERR_READ_MSG_SIZE   1
-#define ERR_MSG_SIZE        2
-#define ERR_OPEN_FILE       3
-#define ERR_DATA            4
-#define ERR_IO_OPEN         5
-#define ERR_IO_SEEK         6
-#define ERR_IO_WRITE        7
-#define ERR_IO_CLOSE        8
-#define ERR_NO_MEM          9
-#define ERR_INVALID_ARGS    10
-#define ERR_READ_MSG_ID     11
+#define ERR_MORE_MSG            -1
+#define ERR_DONE                0
+#define ERR_READ_MSG_SIZE       1
+#define ERR_MSG_SIZE            2
+#define ERR_OPEN_FILE           3
+#define ERR_DATA                4
+#define ERR_IO_OPEN             5
+#define ERR_IO_SEEK             6
+#define ERR_IO_WRITE            7
+#define ERR_IO_CLOSE            8
+#define ERR_NO_MEM              9
+#define ERR_INVALID_ARGS        10
+#define ERR_READ_MSG_ID         11
+#define ERR_MSG_SIZE_INFLATED   12
+#define ERR_ZLIB                13
 
 
 int write_msg_id(uint32_t msg_id)
 int write_msg_id(uint32_t msg_id)
 {
 {
@@ -33,6 +36,28 @@ int write_msg_id(uint32_t msg_id)
     return ERR_DONE;
     return ERR_DONE;
 }
 }
 
 
+int inflate_buf(uint32_t msg_size, void* buf, uint32_t msg_size_inflated,
+                void* inflated_buf)
+{
+    z_stream strm;
+    memset(&strm, 0, sizeof(z_stream));
+    int ret = inflateInit(&strm);
+    if (ret != Z_OK)
+        return ERR_ZLIB;
+
+    strm.avail_in = msg_size;
+    strm.next_in = buf;
+    strm.avail_out = msg_size_inflated;
+    strm.next_out = inflated_buf;
+
+    ret = inflate(&strm, Z_FINISH);
+    if(ret != Z_STREAM_END)
+        return ERR_ZLIB;
+
+    inflateEnd(&strm);
+    return ERR_DONE;
+}
+
 int handle_msg(FILE* input_stream)
 int handle_msg(FILE* input_stream)
 {
 {
     uint32_t msg_id = 0;
     uint32_t msg_id = 0;
@@ -54,6 +79,14 @@ int handle_msg(FILE* input_stream)
     if (msg_size < MIN_MSG_SIZE || msg_size > MAX_MSG_SIZE)
     if (msg_size < MIN_MSG_SIZE || msg_size > MAX_MSG_SIZE)
         return ERR_MSG_SIZE;
         return ERR_MSG_SIZE;
 
 
+    uint32_t msg_size_inflated = 0;
+    c = fread(&msg_size_inflated, 1, sizeof(uint32_t), input_stream);
+    if (c != sizeof(uint32_t))
+        return ERR_MSG_SIZE_INFLATED;
+    if (msg_size_inflated != 0 && (msg_size_inflated < MIN_MSG_SIZE ||
+            msg_size_inflated > MAX_MSG_SIZE))
+        return ERR_MSG_SIZE_INFLATED;
+
     unsigned char* buf = (unsigned char*)malloc(msg_size);
     unsigned char* buf = (unsigned char*)malloc(msg_size);
     if (!buf)
     if (!buf)
         return ERR_NO_MEM;
         return ERR_NO_MEM;
@@ -62,6 +95,24 @@ int handle_msg(FILE* input_stream)
     if (c != msg_size)
     if (c != msg_size)
         return ERR_IO_OPEN;
         return ERR_IO_OPEN;
 
 
+    if(msg_size_inflated)
+    {
+        unsigned char* inflated_buf = (unsigned char*)malloc(msg_size_inflated);
+        if (!inflated_buf)
+            return ERR_NO_MEM;
+
+        int err = inflate_buf(msg_size, buf, msg_size_inflated, inflated_buf);
+        if(err != ERR_DONE)
+        {
+            free(inflated_buf);
+            return err;
+        }
+
+        free(buf);
+        buf = inflated_buf;
+        msg_size = msg_size_inflated;
+    }
+
     char* path = (char*)buf;
     char* path = (char*)buf;
     // strlen is unsafe
     // strlen is unsafe
     unsigned char* data = (unsigned char*)memchr(path, '\0', msg_size);
     unsigned char* data = (unsigned char*)memchr(path, '\0', msg_size);