Explorar o código

Merged in alexpilotti/coriolis/libqemu (pull request #77)

Add QEMU disk reader
Alessandro Pilotti %!s(int64=8) %!d(string=hai) anos
pai
achega
379cc15531
Modificáronse 5 ficheiros con 286 adicións e 455 borrados
  1. 1 5
      coriolis/exception.py
  2. 35 40
      coriolis/migrations/manager.py
  3. 0 410
      coriolis/nbd.py
  4. 121 0
      coriolis/qemu.py
  5. 129 0
      coriolis/qemu_reader.py

+ 1 - 5
coriolis/exception.py

@@ -304,9 +304,5 @@ class SchemaValidationException(CoriolisException):
     safe = True
 
 
-class NBDConnectionException(Exception):
-    pass
-
-
-class NBDException(Exception):
+class QEMUException(Exception):
     pass

+ 35 - 40
coriolis/migrations/manager.py

@@ -3,63 +3,56 @@
 
 import eventlet
 import gc
-import uuid
 import sys
 
 from oslo_log import log as logging
 from oslo_utils import units
 
 from coriolis import events
-from coriolis import nbd
+from coriolis import qemu_reader
 from coriolis.providers import backup_writers
 from coriolis import utils
 
 LOG = logging.getLogger(__name__)
 
 
-def _copy_volume(volume, backup_writer, event_manager):
+def _copy_volume(volume, disk_image_reader, backup_writer, event_manager):
     disk_id = volume["disk_id"]
     # for now we assume it is a local file
-    virtual_disk = volume["disk_image_uri"]
-    # just an identifier. We use it to create a socket path
-    # that we pass to qemu-nbd
-    name = str(uuid.uuid4())
+    path = volume["disk_image_uri"]
+    skip_zeroes = volume.get("zeroed", False)
 
     with backup_writer.open("", disk_id) as writer:
-        with nbd.DiskImageReader(virtual_disk, name) as reader:
+        with disk_image_reader.open(path) as reader:
+            disk_size = reader.disk_size
+
             perc_step = event_manager.add_percentage_step(
-                reader.export_size,
+                disk_size,
                 message_format="Disk copy progress for %s: "
                                "{:.0f}%%" % disk_id)
-            chunk = 4096
+
             offset = 0
-            write_offset = 0
-            buff = b''
-            flush = 10 * units.Mi  # 10 MB
-            export_size = reader.export_size
-            while offset < export_size:
-                readBytes = chunk
-                remaining = export_size - offset
-                remainingDelta = remaining - chunk
-                if remainingDelta <= 0:
-                    readBytes = remaining
-
-                if len(buff) == 0:
-                    write_offset = offset
-
-                data = reader.read(offset, readBytes)
-                offset += readBytes
-
-                buff += data
-                if len(buff) >= flush or export_size == offset:
-                    writer.seek(write_offset)
-                    writer.write(buff)
-                    buff = b''
-                    event_manager.set_percentage_step(
-                        perc_step, offset)
-            buff = None
-            data = None
-            gc.collect()
+            max_block_size = 1 * units.Mi  # 10 MB
+
+            while offset < disk_size:
+                allocated, zero_block, block_size = reader.get_block_status(
+                    offset, max_block_size)
+                if not allocated or zero_block and skip_zeroes:
+                    if not allocated:
+                        LOG.debug("Unallocated block detected: %s", block_size)
+                    else:
+                        LOG.debug("Skipping zero block: %s", block_size)
+                    offset += block_size
+                    writer.seek(offset)
+                else:
+                    buf = reader.read(offset, block_size)
+                    writer.write(buf)
+                    offset += len(buf)
+                    buf = None
+                    gc.collect()
+
+                event_manager.set_percentage_step(
+                    perc_step, offset)
 
 
 def _copy_wrapper(job_args):
@@ -72,13 +65,13 @@ def _copy_wrapper(job_args):
 
 def copy_disk_data(target_conn_info, volumes_info, event_handler):
     # TODO (gsamfira): the disk image should be an URI that can either be local
-    # (file://) or remote (https://, ftp://, smb://, nbd:// etc).
+    # (file://) or remote (https://, ftp://, smb://, nfs:// etc).
     # This must happen if we are to implement multi-worker scenarios.
     # In such cases, it is not guaranteed that the disk sync task
     # will be started on the same node onto which the import
     # happened. It may also be conceivable, that wherever the disk
     # image ends up, we might be able to directly expose it using
-    # NBD, iSCSI or any other network protocol. In which case,
+    # NFS, iSCSI or any other network protocol. In which case,
     # we can skip downloading it locally just to sync it.
 
     event_manager = events.EventManager(event_handler)
@@ -93,9 +86,11 @@ def copy_disk_data(target_conn_info, volumes_info, event_handler):
     utils.wait_for_port_connectivity(ip, port)
     backup_writer = backup_writers.SSHBackupWriter(
         ip, port, username, pkey, password, volumes_info)
+    disk_image_reader = qemu_reader.QEMUDiskImageReader()
 
     pool = eventlet.greenpool.GreenPool()
-    job_data = [(vol, backup_writer, event_manager) for vol in volumes_info]
+    job_data = [(vol, disk_image_reader, backup_writer, event_manager)
+                for vol in volumes_info]
     for result, disk_id, error in pool.imap(_copy_wrapper, job_data):
         # TODO (gsamfira): There is no use in letting the other disks finish
         # sync-ing as we don't save the state of the disk sync anywhere (yet).

+ 0 - 410
coriolis/nbd.py

@@ -1,410 +0,0 @@
-"""
-There is currently no authentication support, nor does this implement
-TLS in any way. It is a really basic implementation designed to be used
-to stream blocks from images to coriolis workers.
-
-TODO (gsamfira): implement authentication. Looks like qemu-nbd supports
-something of the sort
-TODO (gsamfira): implement TLS support
-
-With the above 2 implemented, all sorts of fun things could potentially
-be done.
-
-Protocol documentation at:
-
-https://sourceforge.net/p/nbd/code/ci/e6b56c12f8a18e7a7cc253c73d1f63c2cbc41e1b/tree/doc/proto.md
-"""
-
-import struct
-import socket
-import subprocess
-import os
-import netaddr
-import time
-
-from coriolis import exception
-from coriolis import utils
-from oslo_log import log as logging
-
-LOG = logging.getLogger(__name__)
-
-
-NBD_CMD_READ = 0
-# Not really needed, we only care about read
-NBD_CMD_WRITE = 1
-NBD_CMD_DISC = 2
-NBD_CMD_FLUSH = 3
-NBD_CMD_TRIM = 4
-NBD_CMD_WRITE_ZEROES = 6
-NBD_CMD_BLOCK_STATUS = 7
-NBD_CMD_RESIZE = 8
-
-NBD_INIT_PASSWD = b'NBDMAGIC'
-
-# Option types
-# Client wants to select an export name. After setting this
-# option, we move directly to transfer phase
-NBD_OPT_EXPORT_NAME = 1
-# Abort negotiation and terminate session
-NBD_OPT_ABORT = 2
-# return a list of exports
-NBD_OPT_LIST = 3
-# not in use
-NBD_OPT_PEEK_EXPORT = 4
-# client wants to initiate TLS
-NBD_OPT_STARTTLS = 5
-# Get more detailed info about an export
-NBD_OPT_INFO = 6
-# Client wishes to terminate the handshake phase
-# and move to transmission phase.
-NBD_OPT_GO = 7
-
-
-# Option reply types
-# Sent by server when it accepts the option,
-# and no further data is available
-NBD_REP_ACK = 1
-# A description of an export
-NBD_REP_SERVER = 2
-# detailed description of an aspect of an export
-NBD_REP_INFO = 3
-
-# There are a number of error reply types, all of which are denoted by
-# having bit 31 set. All error replies MAY have some data set, in which
-# case that data is an error message string suitable for display to the user.
-NBD_REP_ERR_UNSUP = 2147483649
-NBD_REP_ERR_POLICY = 2147483650
-NBD_REP_ERR_INVALID = 2147483651
-NBD_REP_ERR_PLATFORM = 2147483652
-NBD_REP_ERR_TLS_REQD = 2147483653
-NBD_REP_ERR_UNKNOWN = 2147483654
-NBD_REP_ERR_SHUTDOWN = 2147483655
-NBD_REP_ERR_BLOCK_SIZE_REQD = 2147483656
-
-# Error values
-EPERM = 1
-EIO = 5
-ENOMEM = 12
-EINVAL = 22
-ENOSPC = 28
-EOVERFLOW = 75
-ESHUTDOWN = 108
-
-# Transmission flags
-NBD_FLAG_HAS_FLAGS = 1 << 0
-NBD_FLAG_READ_ONLY = 1 << 1
-NBD_FLAG_SEND_FLUSH = 1 << 2
-NBD_FLAG_SEND_FUA = 1 << 3
-NBD_FLAG_ROTATIONAL = 1 << 4
-NBD_FLAG_SEND_TRIM = 1 << 5
-NBD_FLAG_SEND_WRITE_ZEROES = 1 << 6
-NBD_FLAG_SEND_DF = 1 << 7
-NBD_FLAG_CAN_MULTI_CONN = 1 << 8
-NBD_FLAG_SEND_BLOCK_STATUS = 1 << 9
-NBD_FLAG_SEND_RESIZE = 1 << 10
-
-# New style server that supports extending
-NBD_FLAG_C_FIXED_NEWSTYLE = 1 << 0
-# Do not send the 128 bytes of empty zeroes
-NBD_FLAG_NO_ZEROES = 1 << 1
-
-NBD_OPTS_MAGIC = 0x49484156454F5054
-NBD_SERVER_REPLY_MAGIC = 0x3e889045565a9
-NBD_CLISERV_MAGIC = 0x420281861253
-NBD_REQUEST_MAGIC = 0x25609513
-NBD_REPLY_MAGIC = 0x67446698
-
-
-class NBDClient(object):
-    """
-    Really basic, READ-ONLY NBD client implementation. Only useful
-    for consuming chunks of an export, or the entire thing.
-
-    WARNING: Do not try to do parallel reads using this class. It will
-    most likely result in garbage data, due to the fact that
-    handles are not properly implemented. That whole song and
-    dance requires more complex code. Sequential reads only
-    at this point please.
-    """
-    def __init__(self, host=None, port=None,
-                 unix_socket=None, export_name=None):
-        self._client_flags = NBD_FLAG_C_FIXED_NEWSTYLE
-        self.export_size = None
-        self.export_name = export_name
-        self._handle = b'1'
-        self._host = host
-        self._port = port
-        self._unix_socket = unix_socket
-        self._export_name = export_name
-        self.sock = None
-
-    def _select_export(self, sock, name):
-        if type(name) is str:
-            name = bytes(name.encode("ascii"))
-        magic = struct.pack('>Q', NBD_OPTS_MAGIC)
-        opt = struct.pack('>L', NBD_OPT_EXPORT_NAME)
-        name_size = struct.pack('>L', len(name))
-
-        payload = magic + opt + name_size + name
-        sock.sendall(payload)
-        response = sock.recv(64)
-        if len(response) == 0:
-            raise exception.NBDException(
-                "Read failed. Likely export name is wrong")
-        decoded = struct.unpack('>QH', response)
-        return decoded[0]
-
-    def _negotiate(self, sock, name=None):
-        # fetch the init password. If this is invalid, either the
-        # server erred or we are trying to start a negotiation a socket
-        # that is already in transmission phase
-        passwdSize = struct.calcsize('>8s')
-        passwd = struct.unpack('>8s', sock.recv(passwdSize))
-        if passwd[0] != NBD_INIT_PASSWD:
-            raise exception.NBDException("Bad NBD passwd: %r. Expected: %r" % (
-                passwd[0], NBD_INIT_PASSWD))
-
-        magicSize = struct.calcsize('>Q')
-        magic = struct.unpack('>Q', sock.recv(magicSize))
-        if magic[0] == int(NBD_CLISERV_MAGIC):
-            # Old style negotiation is not really a negotiation. It's more
-            # like the server saying: "here you go do whatever". Not unlike
-            # a school canteen lunch lady would do when you humbly
-            # (but naively) ask for something edible.
-            LOG.info("Using old style negotiation for %s" % self.export_name)
-            info = struct.unpack('>Q128s', sock.recv(
-                struct.calcsize('>Q128s')))
-            self.export_size = info[0]
-        else:
-            # Looks like we're using new style negotiation.
-            # Export name is required in this situation
-            if name is None:
-                raise ValueError("export name is required for"
-                                 "new style negotiation")
-            # Check that we're using the FIXED_NEWSTYLE
-            # Flags are an unsigned short
-            flags = struct.unpack('>H', sock.recv(struct.calcsize('>H')))
-            needed = flags[0] & NBD_FLAG_C_FIXED_NEWSTYLE
-            if needed != NBD_FLAG_C_FIXED_NEWSTYLE:
-                raise exception.NBDException(
-                    "Server does not support export listing")
-            if flags[0] & NBD_FLAG_NO_ZEROES:
-                self._client_flags |= NBD_FLAG_NO_ZEROES
-            # Send client flags
-            client_flags = struct.pack('>L', self._client_flags)
-            sock.send(client_flags)
-            self.export_size = self._select_export(sock, name)
-
-    def connect(self, host=None, port=None,
-                unix_socket=None, export_name=None):
-        # WARNING: there is no TLS support. Make sure you only use this
-        # for local connections, or in secure environments
-
-        _host = host or self._host
-        _port = port or self._port
-        _unix_socket = unix_socket or self._unix_socket
-        _export_name = export_name or self._export_name
-
-        if self.sock:
-            # we are reconnectiong. Clean up after ourselves
-            self.close()
-
-        sock = None
-        addr = None
-        if _unix_socket is not None:
-            # no need to do extra checks, socket will raise
-            # if the supplied path does not exist
-            sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
-            addr = _unix_socket
-
-        if None not in (_host, _port):
-            ipVersion = netaddr.IPNetwork(_host).version
-            inet = socket.AF_INET
-            if ipVersion == 6:
-                inet = socket.AF_INET6
-            sock = socket.socket(inet, socket.SOCK_STREAM)
-            addr = (_host, _port)
-
-        if sock is None:
-            raise exception.NBDException(
-                "either host/port or socket needs to be set")
-
-        try:
-            sock.connect(addr)
-        except socket.error as err:
-            if err.errno == 106:
-                # already connected, just return
-                # NOTE (gsamfira): this assumes that negotiation
-                # has already happened
-                return sock
-            raise
-        self._negotiate(sock, name=_export_name)
-        self.sock = sock
-
-        self._host = _host
-        self._port = _port
-        self._unix_socket = _unix_socket
-        self._export_name = _export_name
-        return self.sock
-
-    def close(self):
-        if self.sock is None:
-            return
-        request = struct.pack(
-            '>LL8sQL',
-            NBD_REQUEST_MAGIC,
-            NBD_CMD_DISC,
-            self._handle,
-            0,
-            0)
-        self.sock.send(request)
-        self.sock.close()
-        self.sock = None
-        self.export_size = None
-
-    def read(self, offset, length):
-        if self.sock is None:
-            raise exception.NBDConnectionException(
-                "Socket is not connected")
-        if offset > self.export_size:
-            raise ValueError("Offset is outside the size of export")
-        readEnd = offset + length
-        if readEnd > self.export_size:
-            length = self.export_size - offset
-        request = struct.pack(
-            '>LL8sQL',
-            NBD_REQUEST_MAGIC,
-            NBD_CMD_READ,
-            # NOTE (gsamfira): this function is not safe for
-            # concurrent reads! Must not run read() in parallel.
-            # TODO (gsamfira): Implement concurrency. Needs to treat
-            # handles appropriately. Responses are treated
-            # asynchronously  and may come out of order
-            self._handle,
-            offset,
-            length)
-
-        self.sock.send(request)
-        responseSize = struct.calcsize('>LL8s')
-        response = self.sock.recv(responseSize)
-        magic, error, handle = struct.unpack('>LL8s', response)
-        if magic != int(NBD_REPLY_MAGIC):
-            raise exception.NBDException(
-                "Got invalid magic from "
-                "server: %r" % magic)
-        if error != 0:
-            # TODO (gsamfira): translate error codes to messages
-            raise exception.NBDException(
-                "Got invalid response from "
-                "server: %r" % error)
-        got = b''
-        while len(got) < length:
-            more = self.sock.recv(length - len(got))
-            if more == "":
-                raise exception.NBDException(length)
-            got += more
-        return got
-
-
-class DiskImageReader(object):
-
-    def __init__(self, path, name):
-        """
-        param: path: str: The path to the virtual disk image you want to read
-        param: name: str: The name of the export
-        """
-        self.image_path = path
-        self.export_name = name
-        self.socket_path = "/tmp/%s.sock" % self.export_name
-        self._nbd_client = None
-        self._qemu_process = None
-
-    @property
-    def export_size(self):
-        if self._nbd_client is not None:
-            return self._nbd_client.export_size
-        return None
-
-    def _wait_for_socket(self, process, socket_path):
-        count = 0
-        while True:
-            # arbitrary. wait 5 seconds
-            if count >= 50:
-                raise TimeoutError("timed out waiting for"
-                                   " socket: %s" % socket_path)
-            status = process.poll()
-            if status:
-                stdout, stderr = process.communicate()
-                raise exception.NBDException(
-                    "process failed with status: %r" % status)
-            if os.path.exists(socket_path):
-                return socket_path
-            time.sleep(0.1)
-
-    def _supports_newstyle(self):
-        for i in subprocess.check_output(["qemu-nbd", "-h"]).splitlines():
-            if b'--export-name' in i:
-                return True
-        return False
-
-    def connect(self):
-        if os.path.isfile(self.image_path) is False:
-            raise ValueError("Image file %s does not exist" % self.image_path)
-        if self._qemu_process is not None:
-            raise exception.NBDException("qemu-nbd is already running")
-        if self._nbd_client is not None:
-            raise exception.NBDException("client already created")
-
-        if os.path.exists(self.socket_path):
-            raise exception.NBDException(
-                "socket %s already exists" % self.socket_path)
-
-        qemu_cmd = [
-            "qemu-nbd", "-k", self.socket_path,
-            self.image_path
-        ]
-
-        if self._supports_newstyle():
-            qemu_cmd = [
-                "qemu-nbd", "-k", self.socket_path,
-                "-x", self.export_name, self.image_path
-            ]
-
-        LOG.debug("Running command: %s" % ' '.join(qemu_cmd))
-        self._qemu_process = subprocess.Popen(qemu_cmd)
-        self._wait_for_socket(self._qemu_process, self.socket_path)
-
-        self._nbd_client = NBDClient(
-            unix_socket=self.socket_path,
-            export_name=self.export_name)
-        self._nbd_client.connect()
-
-    def close(self):
-        if self._nbd_client:
-            self._nbd_client.close()
-            self._nbd_client = None
-        if self._qemu_process:
-            # the qemu-nbd binary should already have exited on
-            # self._nbd_client.close(), but we kill it for good
-            # measure.
-            self._qemu_process.kill()
-            self._qemu_process.wait()
-            self._qemu_process = None
-        try:
-            os.remove(self.socket_path)
-        except BaseException:
-            pass
-
-    @utils.retry_on_error(terminal_exceptions=[
-        exception.NBDException, exception.NBDConnectionException])
-    def read(self, offset, length):
-        if self._nbd_client is None:
-            raise exception.NBDException("not initialized properly")
-        return self._nbd_client.read(offset, length)
-
-    def __enter__(self):
-        self.connect()
-        return self
-
-    def __exit__(self, *args):
-        self.close()

+ 121 - 0
coriolis/qemu.py

@@ -0,0 +1,121 @@
+# Copyright 2017 Cloudbase Solutions Srl
+# All Rights Reserved.
+
+import ctypes
+
+_libqemu = ctypes.CDLL('libqemu.so')
+
+MODULE_INIT_BLOCK = 0
+MODULE_INIT_OPTS = 1
+MODULE_INIT_QOM = 2
+MODULE_INIT_TRACE = 3
+MODULE_INIT_MAX = 4
+
+BDRV_BLOCK_DATA = 1
+BDRV_BLOCK_ZERO = 2
+BDRV_BLOCK_OFFSET_VALID = 4
+BDRV_BLOCK_RAW = 8
+BDRV_BLOCK_ALLOCATED = 0x10
+BDRV_BLOCK_EOF = 0x20
+
+BDRV_SECTOR_BITS = 9
+
+
+class QObject(ctypes.Structure):
+    _fields_ = [("type", ctypes.c_void_p),
+                ("refcnt", ctypes.c_size_t)]
+
+
+class QString(ctypes.Structure):
+    _fields_ = [("base", QObject),
+                ("string", ctypes.c_char_p),
+                ("length", ctypes.c_size_t),
+                ("capacity", ctypes.c_size_t)]
+
+
+class Error(ctypes.Structure):
+    _fields_ = [("msg", ctypes.c_char_p),
+                ("err_class", ctypes.c_int),
+                ("src", ctypes.c_char_p),
+                ("func", ctypes.c_char_p),
+                ("line", ctypes.c_int),
+                ("hint", ctypes.c_void_p)]
+
+
+_libqemu.qemu_vfree.argtypes = [ctypes.c_void_p]
+_libqemu.qemu_vfree.restype = None
+qemu_vfree = _libqemu.qemu_vfree
+
+_libqemu.module_call_init.argtypes = [ctypes.c_int]
+_libqemu.module_call_init.restype = None
+module_call_init = _libqemu.module_call_init
+
+_libqemu.qemu_init_exec_dir.argtypes = [ctypes.c_char_p]
+_libqemu.qemu_init_exec_dir.restype = None
+qemu_init_exec_dir = _libqemu.qemu_init_exec_dir
+
+_libqemu.qemu_init_main_loop.argtypes = [ctypes.POINTER(ctypes.POINTER(Error))]
+_libqemu.qemu_init_main_loop.res_type = ctypes.c_int
+qemu_init_main_loop = _libqemu.qemu_init_main_loop
+
+_libqemu.qcrypto_init.argtypes = [ctypes.POINTER(ctypes.POINTER(Error))]
+_libqemu.qcrypto_init.res_type = ctypes.c_int
+qcrypto_init = _libqemu.qcrypto_init
+
+_libqemu.error_set_progname.argtypes = [ctypes.c_char_p]
+_libqemu.error_set_progname.restype = None
+error_set_progname = _libqemu.error_set_progname
+
+_libqemu.error_reportf_err.argtypes = [ctypes.POINTER(Error), ctypes.c_char_p]
+_libqemu.error_reportf_err.res_type = None
+error_reportf_err = _libqemu.error_reportf_err
+
+_libqemu.qstring_from_str.argtypes = [ctypes.c_char_p]
+_libqemu.qstring_from_str.restype = ctypes.POINTER(QString)
+qstring_from_str = _libqemu.qstring_from_str
+
+_libqemu.qdict_new.argtypes = []
+_libqemu.qdict_new.res_type = ctypes.c_void_p
+qdict_new = _libqemu.qdict_new
+
+_libqemu.qdict_put_obj.argtypes = [
+    ctypes.c_void_p, ctypes.c_char_p, ctypes.POINTER(QObject)]
+_libqemu.qdict_put_obj.restype = None
+qdict_put_obj = _libqemu.qdict_put_obj
+
+_libqemu.bdrv_init.argtypes = []
+_libqemu.bdrv_init.restype = None
+bdrv_init = _libqemu.bdrv_init
+
+_libqemu.blk_new_open.argtypes = [
+    ctypes.c_char_p, ctypes.c_char_p, ctypes.c_void_p, ctypes.c_int,
+    ctypes.POINTER(ctypes.POINTER(Error))]
+_libqemu.blk_new_open.restype = ctypes.c_void_p
+blk_new_open = _libqemu.blk_new_open
+
+_libqemu.blk_blockalign.argtypes = [ctypes.c_void_p, ctypes.c_size_t]
+_libqemu.blk_blockalign.restype = ctypes.c_void_p
+blk_blockalign = _libqemu.blk_blockalign
+
+_libqemu.blk_bs.argtypes = [ctypes.c_void_p]
+_libqemu.blk_bs.restype = ctypes.c_void_p
+blk_bs = _libqemu.blk_bs
+
+_libqemu.blk_nb_sectors.argtypes = [ctypes.c_void_p]
+_libqemu.blk_nb_sectors.restype = ctypes.c_int64
+blk_nb_sectors = _libqemu.blk_nb_sectors
+
+_libqemu.blk_pread.argtypes = [
+    ctypes.c_void_p, ctypes.c_int64, ctypes.c_void_p, ctypes.c_int]
+_libqemu.blk_pread.res_type = ctypes.c_int
+blk_pread = _libqemu.blk_pread
+
+_libqemu.blk_unref.argtypes = [ctypes.c_void_p]
+_libqemu.blk_unref.restype = None
+blk_unref = _libqemu.blk_unref
+
+_libqemu.bdrv_get_block_status_above.argtypes = [
+    ctypes.c_void_p, ctypes.c_void_p, ctypes.c_int64, ctypes.c_int,
+    ctypes.POINTER(ctypes.c_int), ctypes.POINTER(ctypes.c_void_p)]
+_libqemu.bdrv_get_block_status_above.restype = ctypes.c_int64
+bdrv_get_block_status_above = _libqemu.bdrv_get_block_status_above

+ 129 - 0
coriolis/qemu_reader.py

@@ -0,0 +1,129 @@
+# Copyright 2017 Cloudbase Solutions Srl
+# All Rights Reserved.
+
+import contextlib
+import ctypes
+
+from coriolis import exception
+from coriolis import qemu
+
+
+class QEMUDiskImageReader(object):
+    def __init__(self):
+        self._blk = None
+        self._bs = None
+        self._total_sectors = None
+        self._block_driver_state = None
+        self._buf = None
+        self._buf_size = None
+
+    def close(self):
+        if self._buf is not None:
+            qemu.qemu_vfree(self._buf)
+            self._buf = None
+        self._buf_size = None
+
+        if self._blk is not None:
+            qemu.blk_unref(self._blk)
+            self._blk = None
+
+        self._bs = None
+        self._total_sectors = None
+        self._block_driver_state = None
+
+    def _qemu_open_path(self, path):
+        error = ctypes.POINTER(qemu.Error)()
+
+        options = qemu.qdict_new()
+        blk = qemu.blk_new_open(
+            path.encode(), None, options, 0, ctypes.byref(error))
+        if not blk:
+            raise exception.QEMUException(error.msg)
+
+        self._blk = blk
+        self._bs = qemu.blk_bs(blk)
+        self._total_sectors = qemu.blk_nb_sectors(blk)
+        self._block_driver_state = ctypes.c_void_p()
+
+    @property
+    def disk_size(self):
+        return self._total_sectors << qemu.BDRV_SECTOR_BITS
+
+    @contextlib.contextmanager
+    def open(self, path):
+        try:
+            self._qemu_open_path(path)
+            yield self
+        finally:
+            self.close()
+
+    def _get_sectors(self, offset, size):
+        start_sector = offset >> qemu.BDRV_SECTOR_BITS
+        return (start_sector,
+                min(self._total_sectors - start_sector,
+                    size >> qemu.BDRV_SECTOR_BITS))
+
+    def get_block_status(self, offset, size):
+        start_sector, num_sectors = self._get_sectors(offset, size)
+
+        sectors = 0
+        block_status = None
+        while True:
+            pnum = ctypes.c_int(0)
+            status = qemu.bdrv_get_block_status_above(
+                self._bs, None, start_sector + sectors, num_sectors - sectors,
+                ctypes.byref(pnum), ctypes.byref(self._block_driver_state))
+            if status < 0 or pnum.value == 0:
+                raise exception.QEMUException(
+                    'bdrv_get_block_status_above failed')
+
+            allocated = (status & qemu.BDRV_BLOCK_ALLOCATED) > 0
+            zero_block = (status & qemu.BDRV_BLOCK_ZERO) > 0
+
+            if block_status and block_status != (allocated, zero_block):
+                break
+            block_status = (allocated, zero_block)
+
+            sectors += pnum.value
+            if sectors >= num_sectors:
+                break
+
+        block_size = min(num_sectors, sectors) << qemu.BDRV_SECTOR_BITS
+        return block_status + (block_size,)
+
+    def read(self, offset, size):
+        _, num_sectors = self._get_sectors(offset, size)
+
+        if not self._buf_size or self._buf_size < size:
+            if self._buf is not None:
+                qemu.qemu_vfree(self._buf)
+            self._buf = qemu.blk_blockalign(self._blk, size)
+            self._buf_size = size
+
+        read_size = num_sectors << qemu.BDRV_SECTOR_BITS
+        ret = qemu.blk_pread(
+            self._blk, offset, self._buf, read_size)
+        if ret < 0:
+            raise exception.QEMUException("blk_pread failed")
+
+        return (ctypes.c_ubyte*read_size).from_address(self._buf)
+
+
+def _qemu_init():
+    error = ctypes.POINTER(qemu.Error)()
+
+    qemu.module_call_init(qemu.MODULE_INIT_TRACE)
+    qemu.error_set_progname('coriolis'.encode())
+    qemu.qemu_init_exec_dir('.'.encode())
+
+    if qemu.qemu_init_main_loop(ctypes.byref(error)):
+        raise exception.QEMUException(error.msg)
+
+    if qemu.qcrypto_init(ctypes.byref(error)):
+        raise exception.QEMUException(error.msg)
+
+    qemu.module_call_init(qemu.MODULE_INIT_QOM)
+    qemu.bdrv_init()
+
+
+_qemu_init()