Alessandro Pilotti 8 лет назад
Родитель
Сommit
5452457326
4 измененных файлов с 289 добавлено и 40 удалено
  1. 4 0
      coriolis/exception.py
  2. 35 40
      coriolis/migrations/manager.py
  3. 121 0
      coriolis/qemu.py
  4. 129 0
      coriolis/qemu_reader.py

+ 4 - 0
coriolis/exception.py

@@ -310,3 +310,7 @@ class NBDConnectionException(Exception):
 
 class NBDException(Exception):
     pass
+
+
+class QEMUException(Exception):
+    pass

+ 35 - 40
coriolis/migrations/manager.py

@@ -3,63 +3,56 @@
 
 import eventlet
 import gc
-import uuid
 import sys
 
 from oslo_log import log as logging
 from oslo_utils import units
 
 from coriolis import events
-from coriolis import nbd
+from coriolis import qemu_reader
 from coriolis.providers import backup_writers
 from coriolis import utils
 
 LOG = logging.getLogger(__name__)
 
 
-def _copy_volume(volume, backup_writer, event_manager):
+def _copy_volume(volume, disk_image_reader, backup_writer, event_manager):
     disk_id = volume["disk_id"]
     # for now we assume it is a local file
-    virtual_disk = volume["disk_image_uri"]
-    # just an identifier. We use it to create a socket path
-    # that we pass to qemu-nbd
-    name = str(uuid.uuid4())
+    path = volume["disk_image_uri"]
+    skip_zeroes = volume.get("zeroed", False)
 
     with backup_writer.open("", disk_id) as writer:
-        with nbd.DiskImageReader(virtual_disk, name) as reader:
+        with disk_image_reader.open(path) as reader:
+            disk_size = reader.disk_size
+
             perc_step = event_manager.add_percentage_step(
-                reader.export_size,
+                disk_size,
                 message_format="Disk copy progress for %s: "
                                "{:.0f}%%" % disk_id)
-            chunk = 4096
+
             offset = 0
-            write_offset = 0
-            buff = b''
-            flush = 10 * units.Mi  # 10 MB
-            export_size = reader.export_size
-            while offset < export_size:
-                readBytes = chunk
-                remaining = export_size - offset
-                remainingDelta = remaining - chunk
-                if remainingDelta <= 0:
-                    readBytes = remaining
-
-                if len(buff) == 0:
-                    write_offset = offset
-
-                data = reader.read(offset, readBytes)
-                offset += readBytes
-
-                buff += data
-                if len(buff) >= flush or export_size == offset:
-                    writer.seek(write_offset)
-                    writer.write(buff)
-                    buff = b''
-                    event_manager.set_percentage_step(
-                        perc_step, offset)
-            buff = None
-            data = None
-            gc.collect()
+            max_block_size = 1 * units.Mi  # 10 MB
+
+            while offset < disk_size:
+                allocated, zero_block, block_size = reader.get_block_status(
+                    offset, max_block_size)
+                if not allocated or zero_block and skip_zeroes:
+                    if not allocated:
+                        LOG.debug("Unallocated block detected: %s", block_size)
+                    else:
+                        LOG.debug("Skipping zero block: %s", block_size)
+                    offset += block_size
+                    writer.seek(offset)
+                else:
+                    buf = reader.read(offset, block_size)
+                    writer.write(buf)
+                    offset += len(buf)
+                    buf = None
+                    gc.collect()
+
+                event_manager.set_percentage_step(
+                    perc_step, offset)
 
 
 def _copy_wrapper(job_args):
@@ -72,13 +65,13 @@ def _copy_wrapper(job_args):
 
 def copy_disk_data(target_conn_info, volumes_info, event_handler):
     # TODO (gsamfira): the disk image should be an URI that can either be local
-    # (file://) or remote (https://, ftp://, smb://, nbd:// etc).
+    # (file://) or remote (https://, ftp://, smb://, nfs:// etc).
     # This must happen if we are to implement multi-worker scenarios.
     # In such cases, it is not guaranteed that the disk sync task
     # will be started on the same node onto which the import
     # happened. It may also be conceivable, that wherever the disk
     # image ends up, we might be able to directly expose it using
-    # NBD, iSCSI or any other network protocol. In which case,
+    # NFS, iSCSI or any other network protocol. In which case,
     # we can skip downloading it locally just to sync it.
 
     event_manager = events.EventManager(event_handler)
@@ -93,9 +86,11 @@ def copy_disk_data(target_conn_info, volumes_info, event_handler):
     utils.wait_for_port_connectivity(ip, port)
     backup_writer = backup_writers.SSHBackupWriter(
         ip, port, username, pkey, password, volumes_info)
+    disk_image_reader = qemu_reader.QEMUDiskImageReader()
 
     pool = eventlet.greenpool.GreenPool()
-    job_data = [(vol, backup_writer, event_manager) for vol in volumes_info]
+    job_data = [(vol, disk_image_reader, backup_writer, event_manager)
+                for vol in volumes_info]
     for result, disk_id, error in pool.imap(_copy_wrapper, job_data):
         # TODO (gsamfira): There is no use in letting the other disks finish
         # sync-ing as we don't save the state of the disk sync anywhere (yet).

+ 121 - 0
coriolis/qemu.py

@@ -0,0 +1,121 @@
+# Copyright 2017 Cloudbase Solutions Srl
+# All Rights Reserved.
+
+import ctypes
+
+_libqemu = ctypes.CDLL('libqemu.so')
+
+MODULE_INIT_BLOCK = 0
+MODULE_INIT_OPTS = 1
+MODULE_INIT_QOM = 2
+MODULE_INIT_TRACE = 3
+MODULE_INIT_MAX = 4
+
+BDRV_BLOCK_DATA = 1
+BDRV_BLOCK_ZERO = 2
+BDRV_BLOCK_OFFSET_VALID = 4
+BDRV_BLOCK_RAW = 8
+BDRV_BLOCK_ALLOCATED = 0x10
+BDRV_BLOCK_EOF = 0x20
+
+BDRV_SECTOR_BITS = 9
+
+
+class QObject(ctypes.Structure):
+    _fields_ = [("type", ctypes.c_void_p),
+                ("refcnt", ctypes.c_size_t)]
+
+
+class QString(ctypes.Structure):
+    _fields_ = [("base", QObject),
+                ("string", ctypes.c_char_p),
+                ("length", ctypes.c_size_t),
+                ("capacity", ctypes.c_size_t)]
+
+
+class Error(ctypes.Structure):
+    _fields_ = [("msg", ctypes.c_char_p),
+                ("err_class", ctypes.c_int),
+                ("src", ctypes.c_char_p),
+                ("func", ctypes.c_char_p),
+                ("line", ctypes.c_int),
+                ("hint", ctypes.c_void_p)]
+
+
+_libqemu.qemu_vfree.argtypes = [ctypes.c_void_p]
+_libqemu.qemu_vfree.restype = None
+qemu_vfree = _libqemu.qemu_vfree
+
+_libqemu.module_call_init.argtypes = [ctypes.c_int]
+_libqemu.module_call_init.restype = None
+module_call_init = _libqemu.module_call_init
+
+_libqemu.qemu_init_exec_dir.argtypes = [ctypes.c_char_p]
+_libqemu.qemu_init_exec_dir.restype = None
+qemu_init_exec_dir = _libqemu.qemu_init_exec_dir
+
+_libqemu.qemu_init_main_loop.argtypes = [ctypes.POINTER(ctypes.POINTER(Error))]
+_libqemu.qemu_init_main_loop.res_type = ctypes.c_int
+qemu_init_main_loop = _libqemu.qemu_init_main_loop
+
+_libqemu.qcrypto_init.argtypes = [ctypes.POINTER(ctypes.POINTER(Error))]
+_libqemu.qcrypto_init.res_type = ctypes.c_int
+qcrypto_init = _libqemu.qcrypto_init
+
+_libqemu.error_set_progname.argtypes = [ctypes.c_char_p]
+_libqemu.error_set_progname.restype = None
+error_set_progname = _libqemu.error_set_progname
+
+_libqemu.error_reportf_err.argtypes = [ctypes.POINTER(Error), ctypes.c_char_p]
+_libqemu.error_reportf_err.res_type = None
+error_reportf_err = _libqemu.error_reportf_err
+
+_libqemu.qstring_from_str.argtypes = [ctypes.c_char_p]
+_libqemu.qstring_from_str.restype = ctypes.POINTER(QString)
+qstring_from_str = _libqemu.qstring_from_str
+
+_libqemu.qdict_new.argtypes = []
+_libqemu.qdict_new.res_type = ctypes.c_void_p
+qdict_new = _libqemu.qdict_new
+
+_libqemu.qdict_put_obj.argtypes = [
+    ctypes.c_void_p, ctypes.c_char_p, ctypes.POINTER(QObject)]
+_libqemu.qdict_put_obj.restype = None
+qdict_put_obj = _libqemu.qdict_put_obj
+
+_libqemu.bdrv_init.argtypes = []
+_libqemu.bdrv_init.restype = None
+bdrv_init = _libqemu.bdrv_init
+
+_libqemu.blk_new_open.argtypes = [
+    ctypes.c_char_p, ctypes.c_char_p, ctypes.c_void_p, ctypes.c_int,
+    ctypes.POINTER(ctypes.POINTER(Error))]
+_libqemu.blk_new_open.restype = ctypes.c_void_p
+blk_new_open = _libqemu.blk_new_open
+
+_libqemu.blk_blockalign.argtypes = [ctypes.c_void_p, ctypes.c_size_t]
+_libqemu.blk_blockalign.restype = ctypes.c_void_p
+blk_blockalign = _libqemu.blk_blockalign
+
+_libqemu.blk_bs.argtypes = [ctypes.c_void_p]
+_libqemu.blk_bs.restype = ctypes.c_void_p
+blk_bs = _libqemu.blk_bs
+
+_libqemu.blk_nb_sectors.argtypes = [ctypes.c_void_p]
+_libqemu.blk_nb_sectors.restype = ctypes.c_int64
+blk_nb_sectors = _libqemu.blk_nb_sectors
+
+_libqemu.blk_pread.argtypes = [
+    ctypes.c_void_p, ctypes.c_int64, ctypes.c_void_p, ctypes.c_int]
+_libqemu.blk_pread.res_type = ctypes.c_int
+blk_pread = _libqemu.blk_pread
+
+_libqemu.blk_unref.argtypes = [ctypes.c_void_p]
+_libqemu.blk_unref.restype = None
+blk_unref = _libqemu.blk_unref
+
+_libqemu.bdrv_get_block_status_above.argtypes = [
+    ctypes.c_void_p, ctypes.c_void_p, ctypes.c_int64, ctypes.c_int,
+    ctypes.POINTER(ctypes.c_int), ctypes.POINTER(ctypes.c_void_p)]
+_libqemu.bdrv_get_block_status_above.restype = ctypes.c_int64
+bdrv_get_block_status_above = _libqemu.bdrv_get_block_status_above

+ 129 - 0
coriolis/qemu_reader.py

@@ -0,0 +1,129 @@
+# Copyright 2017 Cloudbase Solutions Srl
+# All Rights Reserved.
+
+import contextlib
+import ctypes
+
+from coriolis import exception
+from coriolis import qemu
+
+
+class QEMUDiskImageReader(object):
+    def __init__(self):
+        self._blk = None
+        self._bs = None
+        self._total_sectors = None
+        self._block_driver_state = None
+        self._buf = None
+        self._buf_size = None
+
+    def close(self):
+        if self._buf is not None:
+            qemu.qemu_vfree(self._buf)
+            self._buf = None
+        self._buf_size = None
+
+        if self._blk is not None:
+            qemu.blk_unref(self._blk)
+            self._blk = None
+
+        self._bs = None
+        self._total_sectors = None
+        self._block_driver_state = None
+
+    def _qemu_open_path(self, path):
+        error = ctypes.POINTER(qemu.Error)()
+
+        options = qemu.qdict_new()
+        blk = qemu.blk_new_open(
+            path.encode(), None, options, 0, ctypes.byref(error))
+        if not blk:
+            raise exception.QEMUException(error.msg)
+
+        self._blk = blk
+        self._bs = qemu.blk_bs(blk)
+        self._total_sectors = qemu.blk_nb_sectors(blk)
+        self._block_driver_state = ctypes.c_void_p()
+
+    @property
+    def disk_size(self):
+        return self._total_sectors << qemu.BDRV_SECTOR_BITS
+
+    @contextlib.contextmanager
+    def open(self, path):
+        try:
+            self._qemu_open_path(path)
+            yield self
+        finally:
+            self.close()
+
+    def _get_sectors(self, offset, size):
+        start_sector = offset >> qemu.BDRV_SECTOR_BITS
+        return (start_sector,
+                min(self._total_sectors - start_sector,
+                    size >> qemu.BDRV_SECTOR_BITS))
+
+    def get_block_status(self, offset, size):
+        start_sector, num_sectors = self._get_sectors(offset, size)
+
+        sectors = 0
+        block_status = None
+        while True:
+            pnum = ctypes.c_int(0)
+            status = qemu.bdrv_get_block_status_above(
+                self._bs, None, start_sector + sectors, num_sectors - sectors,
+                ctypes.byref(pnum), ctypes.byref(self._block_driver_state))
+            if status < 0 or pnum.value == 0:
+                raise exception.QEMUException(
+                    'bdrv_get_block_status_above failed')
+
+            allocated = (status & qemu.BDRV_BLOCK_ALLOCATED) > 0
+            zero_block = (status & qemu.BDRV_BLOCK_ZERO) > 0
+
+            if block_status and block_status != (allocated, zero_block):
+                break
+            block_status = (allocated, zero_block)
+
+            sectors += pnum.value
+            if sectors >= num_sectors:
+                break
+
+        block_size = min(num_sectors, sectors) << qemu.BDRV_SECTOR_BITS
+        return block_status + (block_size,)
+
+    def read(self, offset, size):
+        _, num_sectors = self._get_sectors(offset, size)
+
+        if not self._buf_size or self._buf_size < size:
+            if self._buf is not None:
+                qemu.qemu_vfree(self._buf)
+            self._buf = qemu.blk_blockalign(self._blk, size)
+            self._buf_size = size
+
+        read_size = num_sectors << qemu.BDRV_SECTOR_BITS
+        ret = qemu.blk_pread(
+            self._blk, offset, self._buf, read_size)
+        if ret < 0:
+            raise exception.QEMUException("blk_pread failed")
+
+        return (ctypes.c_ubyte*read_size).from_address(self._buf)
+
+
+def _qemu_init():
+    error = ctypes.POINTER(qemu.Error)()
+
+    qemu.module_call_init(qemu.MODULE_INIT_TRACE)
+    qemu.error_set_progname('coriolis'.encode())
+    qemu.qemu_init_exec_dir('.'.encode())
+
+    if qemu.qemu_init_main_loop(ctypes.byref(error)):
+        raise exception.QEMUException(error.msg)
+
+    if qemu.qcrypto_init(ctypes.byref(error)):
+        raise exception.QEMUException(error.msg)
+
+    qemu.module_call_init(qemu.MODULE_INIT_QOM)
+    qemu.bdrv_init()
+
+
+_qemu_init()