Gabriel пре 8 година
родитељ
комит
9784168a74

+ 7 - 7
coriolis/conductor/rpc/server.py

@@ -531,22 +531,22 @@ class ConductorServerEndpoint(object):
                 instance, constants.TASK_TYPE_IMPORT_INSTANCE,
                 execution, depends_on=[task_export.id])
 
-            task_deploy_disk_copy_worker = self._create_task(
-                instance, constants.TASK_TYPE_DEPLOY_DISK_COPY_WORKER,
+            task_deploy_disk_copy_resources = self._create_task(
+                instance, constants.TASK_TYPE_DEPLOY_DISK_COPY_RESOURCES,
                 execution, depends_on=[task_import.id])
 
             task_copy_disk = self._create_task(
                 instance, constants.TASK_TYPE_COPY_DISK_DATA,
-                execution, depends_on=[task_deploy_disk_copy_worker.id])
+                execution, depends_on=[task_deploy_disk_copy_resources.id])
 
-            task_delete_disk_copy_worker = self._create_task(
-                instance, constants.TASK_TYPE_DELETE_DISK_COPY_WORKER,
+            task_delete_disk_copy_resources = self._create_task(
+                instance, constants.TASK_TYPE_DELETE_DISK_COPY_RESOURCES,
                 execution, depends_on=[task_copy_disk.id], on_error=True)
 
             if not skip_os_morphing:
                 task_deploy_os_morphing_resources = self._create_task(
                     instance, constants.TASK_TYPE_DEPLOY_OS_MORPHING_RESOURCES,
-                    execution, depends_on=[task_delete_disk_copy_worker.id])
+                    execution, depends_on=[task_delete_disk_copy_resources.id])
 
                 task_osmorphing = self._create_task(
                     instance, constants.TASK_TYPE_OS_MORPHING,
@@ -560,7 +560,7 @@ class ConductorServerEndpoint(object):
 
                 next_task = task_delete_os_morphing_resources
             else:
-                next_task = task_delete_disk_copy_worker
+                next_task = task_delete_disk_copy_resources
 
             self._create_task(
                 instance, constants.TASK_TYPE_FINALIZE_IMPORT_INSTANCE,

+ 2 - 2
coriolis/constants.py

@@ -17,9 +17,9 @@ TASK_TYPE_IMPORT_INSTANCE = "IMPORT_INSTANCE"
 TASK_TYPE_FINALIZE_IMPORT_INSTANCE = "FINALIZE_IMPORT_INSTANCE"
 TASK_TYPE_CLEANUP_FAILED_IMPORT_INSTANCE = "CLEANUP_FAILED_IMPORT_INSTANCE"
 
-TASK_TYPE_DEPLOY_DISK_COPY_WORKER = "DEPLOY_DISK_COPY_WORKER"
+TASK_TYPE_DEPLOY_DISK_COPY_RESOURCES = "DEPLOY_DISK_COPY_RESOURCES"
 TASK_TYPE_COPY_DISK_DATA = "COPY_DISK_DATA"
-TASK_TYPE_DELETE_DISK_COPY_WORKER = "DELETE_DISK_COPY_WORKER"
+TASK_TYPE_DELETE_DISK_COPY_RESOURCES = "DELETE_DISK_COPY_RESOURCES"
 
 
 TASK_TYPE_DEPLOY_OS_MORPHING_RESOURCES = "DEPLOY_OS_MORPHING_RESOURCES"

+ 8 - 0
coriolis/exception.py

@@ -302,3 +302,11 @@ class ConnectionValidationException(CoriolisException):
 
 class SchemaValidationException(CoriolisException):
     safe = True
+
+
+class NBDConnectionException(Exception):
+    pass
+
+
+class NBDException(Exception):
+    pass

+ 104 - 78
coriolis/migrations/manager.py

@@ -1,10 +1,13 @@
 # Copyright 2017 Cloudbase Solutions Srl
 # All Rights Reserved.
 
+import eventlet
 import gc
 import uuid
+import sys
 
 from oslo_log import log as logging
+from oslo_utils import units
 
 from coriolis import events
 from coriolis import nbd
@@ -14,16 +17,95 @@ from coriolis import utils
 LOG = logging.getLogger(__name__)
 
 
+def _copy_volume(volume, backup_writer, event_manager):
+    disk_id = volume["disk_id"]
+    # for now we assume it is a local file
+    virtual_disk = volume["disk_image_uri"]
+    # just an identifier. We use it to create a socket path
+    # that we pass to qemu-nbd
+    name = str(uuid.uuid4())
+    vol_id = volume["volume_id"]
+    chunk = 4096
+
+    with backup_writer.open("", disk_id) as writer:
+        with nbd.DiskImageReader(virtual_disk, name) as reader:
+            perc_step = event_manager.add_percentage_step(
+                reader.export_size,
+                message_format="Disk copy progress for %s: "
+                               "{:.0f}%%" % vol_id)
+
+            offset = 0
+            write_offset = 0
+            buff = b''
+            flush = 10 * units.Mi  # 10 MB
+            emptyChunk = b'\0' * chunk
+            written = 0
+            export_size = reader.export_size
+            while offset < export_size:
+                readBytes = chunk
+                remaining = export_size - offset
+                remainingDelta = remaining - chunk
+                if remainingDelta <= 0:
+                    readBytes = remaining
+                    emptyChunk = emptyChunk[0:readBytes]
+
+                data = reader.read(offset, readBytes)
+                if data == emptyChunk:
+                    # got a whole lot of nothing. Not putting
+                    # it on the wire
+                    if len(buff) > 0:
+                        LOG.debug("Found empty chunk. Flushing buffer:"
+                                  " %r starting at offset: %r" % (
+                                      len(buff), write_offset))
+                        writer.seek(write_offset)
+                        writer.write(buff)
+                        written += len(buff)
+                        event_manager.set_percentage_step(
+                            perc_step, offset)
+                        buff = b''
+                else:
+                    if len(buff) == 0:
+                        LOG.debug("Found written chunk at %r."
+                                  " Last written offset: %r" % (
+                                      offset, write_offset))
+                        write_offset = offset
+                    buff += data
+                    if len(buff) >= flush or export_size == offset:
+                        writer.seek(write_offset)
+                        writer.write(buff)
+                        written += len(buff)
+                        buff = b''
+                offset += readBytes
+                if offset % flush == 0 or export_size == offset:
+                    event_manager.set_percentage_step(
+                        perc_step, offset)
+            event_manager.progress_update(
+                "Total bytes sent over wire for volume \"%s\": %r MB" % (
+                    vol_id, int(written / units.Mi)))
+            emptyChunk = None
+            buff = None
+            data = None
+            gc.collect()
+
+
+def _copy_wrapper(job_args):
+    vol_id = job_args[0].get("volume_id")
+    try:
+        return _copy_volume(*job_args), vol_id, False
+    except BaseException:
+        return sys.exc_info(), vol_id, True
+
+
 def copy_disk_data(target_conn_info, volumes_info, event_handler):
-    # TODO: the disk image should be an URI that can either be local
-    #       (file://) or remote (https://, ftp://, smb://, nbd:// etc).
-    #       This must happen if we are to implement multi-worker scenarios.
-    #       In such cases, it is not guaranteed that the disk sync task
-    #       will be started on the same node onto which the import
-    #       happened. It may also be conceivable, that wherever the disk
-    #       image ends up, we might be able to directly expose it using
-    #       NBD, iSCSI or any other network protocol. In which case,
-    #       we can skip downloading it locally just to sync it.
+    # TODO (gsamfira): the disk image should be an URI that can either be local
+    # (file://) or remote (https://, ftp://, smb://, nbd:// etc).
+    # This must happen if we are to implement multi-worker scenarios.
+    # In such cases, it is not guaranteed that the disk sync task
+    # will be started on the same node onto which the import
+    # happened. It may also be conceivable, that wherever the disk
+    # image ends up, we might be able to directly expose it using
+    # NBD, iSCSI or any other network protocol. In which case,
+    # we can skip downloading it locally just to sync it.
 
     event_manager = events.EventManager(event_handler)
 
@@ -32,76 +114,20 @@ def copy_disk_data(target_conn_info, volumes_info, event_handler):
     username = target_conn_info["username"]
     pkey = target_conn_info.get("pkey")
     password = target_conn_info.get("password")
+    event_manager.progress_update("Waiting for connectivity on %r:%r" % (
+        ip, port))
     utils.wait_for_port_connectivity(ip, port)
     backup_writer = backup_writers.SSHBackupWriter(
         ip, port, username, pkey, password, volumes_info)
 
-    chunk = 4096
-
-    for volume in volumes_info:
-        disk_id = volume["disk_id"]
-        # for now we assume it is a local file
-        virtual_disk = volume["disk_image_uri"]
-        # just an identifier. We use it to create a socket path
-        # that we pass to qemu-nbd
-        vol_name = volume.get("volume_name") or str(uuid.uuid4())
-
-        with backup_writer.open("", disk_id) as writer:
-            with nbd.ImageReader(virtual_disk, vol_name) as reader:
-                perc_step = event_manager.add_percentage_step(
-                    reader.export_size,
-                    message_format="Disk copy progress for %s: "
-                                   "{:.0f}%%" % vol_name)
-
-                offset = 0
-                write_offset = 0
-                buff = b''
-                flush = 10 * 1024 * 1024  # 10 MB
-                emptyChunk = b'\0' * chunk
-                written = 0
-                export_size = reader.export_size
-                while offset < export_size:
-                    readBytes = chunk
-                    remaining = export_size - offset
-                    remainingDelta = remaining - chunk
-                    if remainingDelta <= 0:
-                        readBytes = remaining
-                        emptyChunk = emptyChunk[0:readBytes]
-
-                    data = reader.read(offset, readBytes)
-                    if data == emptyChunk:
-                        # got a whole lot of nothing. Not putting
-                        # it on the wire
-                        if len(buff) > 0:
-                            LOG.info("Found empty chunk. Flushing buffer:"
-                                     " %r starting at offset: %r" % (
-                                         len(buff), write_offset))
-                            writer.seek(write_offset)
-                            writer.write(buff)
-                            written += len(buff)
-                            event_manager.set_percentage_step(
-                                perc_step, offset)
-                            buff = b''
-                    else:
-                        if len(buff) == 0:
-                            LOG.info("Found written chunk at %r."
-                                     " Last written offset: %r" % (
-                                         offset, write_offset))
-                            write_offset = offset
-                        buff += data
-                        if len(buff) >= flush or export_size == offset:
-                            writer.seek(write_offset)
-                            writer.write(buff)
-                            written += len(buff)
-                            buff = b''
-                    offset += readBytes
-                    if offset % flush == 0 or export_size == offset:
-                        event_manager.set_percentage_step(
-                            perc_step, offset)
-                event_manager.progress_update(
-                    "Total bytes sent over wire: %r MB" % (
-                        written / 1024 / 1024))
-                emptyChunk = None
-                buff = None
-                data = None
-                gc.collect()
+    pool = eventlet.greenpool.GreenPool()
+    job_data = [(vol, backup_writer, event_manager) for vol in volumes_info]
+    for result, vol_id, error in pool.imap(_copy_wrapper, job_data):
+        # TODO (gsamfira): There is no use in letting the other disks finish
+        # sync-ing as we don't save the state of the disk sync anywhere (yet).
+        # When/If we ever do add this info to the database, keep track of
+        # failures, and allow any other paralel sync to finish
+        if error:
+            event_manager.progress_update(
+                "Volume \"%s\" failed to sync" % vol_id)
+            raise result[0](result[1]).with_traceback(result[2])

+ 72 - 42
coriolis/nbd.py

@@ -3,9 +3,9 @@ There is currently no authentication support, nor does this implement
 TLS in any way. It is a really basic implementation designed to be used
 to stream blocks from images to coriolis workers.
 
-TODO: implement authentication. Looks like qemu-nbd supports something of
-      the sort
-TODO: implement TLS support
+TODO (gsamfira): implement authentication. Looks like qemu-nbd supports
+something of the sort
+TODO (gsamfira): implement TLS support
 
 With the above 2 implemented, all sorts of fun things could potentially
 be done.
@@ -22,6 +22,8 @@ import os
 import netaddr
 import time
 
+from coriolis import exception
+from coriolis import utils
 from oslo_log import log as logging
 
 LOG = logging.getLogger(__name__)
@@ -119,10 +121,10 @@ class NBDClient(object):
     for consuming chunks of an export, or the entire thing.
 
     WARNING: Do not try to do parallel reads using this class. It will
-             most likely result in garbage data, due to the fact that
-             handles are not properly implemented. That whole song and
-             dance requires more complex code. Sequential reads only
-             at this point please.
+    most likely result in garbage data, due to the fact that
+    handles are not properly implemented. That whole song and
+    dance requires more complex code. Sequential reads only
+    at this point please.
     """
     def __init__(self, host=None, port=None,
                  unix_socket=None, export_name=None):
@@ -130,11 +132,11 @@ class NBDClient(object):
         self.export_size = None
         self.export_name = export_name
         self._handle = b'1'
-        self.sock = self.connect(
-            host=host,
-            port=port,
-            unix_socket=unix_socket,
-            export_name=export_name)
+        self._host = host
+        self._port = port
+        self._unix_socket = unix_socket
+        self._export_name = export_name
+        self.sock = None
 
     def _select_export(self, sock, name):
         if type(name) is str:
@@ -147,7 +149,8 @@ class NBDClient(object):
         sock.sendall(payload)
         response = sock.recv(64)
         if len(response) == 0:
-            raise Exception("Read failed. Likely export name is wrong")
+            raise exception.NBDException(
+                "Read failed. Likely export name is wrong")
         decoded = struct.unpack('>QH', response)
         return decoded[0]
 
@@ -158,7 +161,7 @@ class NBDClient(object):
         passwdSize = struct.calcsize('>8s')
         passwd = struct.unpack('>8s', sock.recv(passwdSize))
         if passwd[0] != NBD_INIT_PASSWD:
-            raise Exception("Bad NBD passwd: %r --> %r" % (
+            raise exception.NBDException("Bad NBD passwd: %r. Expected: %r" % (
                 passwd[0], NBD_INIT_PASSWD))
 
         magicSize = struct.calcsize('>Q')
@@ -183,7 +186,8 @@ class NBDClient(object):
             flags = struct.unpack('>H', sock.recv(struct.calcsize('>H')))
             needed = flags[0] & NBD_FLAG_C_FIXED_NEWSTYLE
             if needed != NBD_FLAG_C_FIXED_NEWSTYLE:
-                raise Exception("Server does not support export listing")
+                raise exception.NBDException(
+                    "Server does not support export listing")
             if flags[0] & NBD_FLAG_NO_ZEROES:
                 self._client_flags |= NBD_FLAG_NO_ZEROES
             # Send client flags
@@ -193,41 +197,59 @@ class NBDClient(object):
 
     def connect(self, host=None, port=None,
                 unix_socket=None, export_name=None):
-        # there is no TLS support. Make sure you only use this
+        # WARNING: there is no TLS support. Make sure you only use this
         # for local connections, or in secure environments
+
+        _host = host or self._host
+        _port = port or self._port
+        _unix_socket = unix_socket or self._unix_socket
+        _export_name = export_name or self._export_name
+
+        if self.sock:
+            # we are reconnectiong. Clean up after ourselves
+            self.close()
+
         sock = None
         addr = None
-        if socket is not None:
+        if _unix_socket is not None:
             # no need to do extra checks, socket will raise
             # if the supplied path does not exist
             sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
-            addr = unix_socket
+            addr = _unix_socket
 
-        if None not in (host, port):
-            ipVersion = netaddr.IPNetwork(host).version
+        if None not in (_host, _port):
+            ipVersion = netaddr.IPNetwork(_host).version
             inet = socket.AF_INET
             if ipVersion == 6:
                 inet = socket.AF_INET6
             sock = socket.socket(inet, socket.SOCK_STREAM)
-            addr = (host, port)
+            addr = (_host, _port)
 
         if sock is None:
-            raise Exception("either host/port or socket needs to be set")
+            raise exception.NBDException(
+                "either host/port or socket needs to be set")
 
         try:
             sock.connect(addr)
         except socket.error as err:
             if err.errno == 106:
                 # already connected, just return
-                # WARNING: this assumes that negotiation
-                #          has already happened
+                # NOTE (gsamfira): this assumes that negotiation
+                # has already happened
                 return sock
             raise
-        self._negotiate(sock, name=export_name)
+        self._negotiate(sock, name=_export_name)
         self.sock = sock
+
+        self._host = _host
+        self._port = _port
+        self._unix_socket = _unix_socket
+        self._export_name = _export_name
         return self.sock
 
     def close(self):
+        if self.sock is None:
+            return
         request = struct.pack(
             '>LL8sQL',
             NBD_REQUEST_MAGIC,
@@ -241,6 +263,9 @@ class NBDClient(object):
         self.export_size = None
 
     def read(self, offset, length):
+        if self.sock is None:
+            raise exception.NBDConnectionException(
+                "Socket is not connected")
         if offset > self.export_size:
             raise ValueError("Offset is outside the size of export")
         readEnd = offset + length
@@ -250,12 +275,11 @@ class NBDClient(object):
             '>LL8sQL',
             NBD_REQUEST_MAGIC,
             NBD_CMD_READ,
-            # WARNING: this function is not safe for
-            #          concurrent reads! Must not run
-            #          read() in parallel.
-            # TODO: Implement concurrency. Needs to treat
-            #       handles appropriately. Responses are treated
-            #       asynchronously  and may come out of order
+            # NOTE (gsamfira): this function is not safe for
+            # concurrent reads! Must not run read() in parallel.
+            # TODO (gsamfira): Implement concurrency. Needs to treat
+            # handles appropriately. Responses are treated
+            # asynchronously  and may come out of order
             self._handle,
             offset,
             length)
@@ -266,22 +290,23 @@ class NBDClient(object):
         magic, error, handle = struct.unpack('>LL8s', response)
         if magic != int(NBD_REPLY_MAGIC):
             raise ValueError(
-                "Got invalid response from "
-                "server: %r --> %r --> %r" % (magic, error, handle))
+                "Got invalid magic from "
+                "server: %r" % magic)
         if error != 0:
+            # TODO (gsamfira): translate error codes to messages
             raise Exception(
                 "Got invalid response from "
-                "server: %r --> %r --> %r" % (magic, error, handle))
+                "server: %r" % error)
         got = b''
         while len(got) < length:
             more = self.sock.recv(length - len(got))
             if more == "":
-                raise Exception(length)
+                raise exception.NBDException(length)
             got += more
         return got
 
 
-class ImageReader(object):
+class DiskImageReader(object):
 
     def __init__(self, path, name):
         """
@@ -310,7 +335,8 @@ class ImageReader(object):
             status = process.poll()
             if status:
                 stdout, stderr = process.communicate()
-                raise Exception("process failed with status: %r" % status)
+                raise exception.NBDException(
+                    "process failed with status: %r" % status)
             if os.path.exists(socket_path):
                 return socket_path
             time.sleep(0.1)
@@ -325,12 +351,13 @@ class ImageReader(object):
         if os.path.isfile(self.image_path) is False:
             raise ValueError("Image file %s does not exist" % self.image_path)
         if self._qemu_process is not None:
-            raise Exception("qemu-nbd is already running")
+            raise exception.NBDException("qemu-nbd is already running")
         if self._nbd_client is not None:
-            raise Exception("client already created")
+            raise exception.NBDException("client already created")
 
         if os.path.exists(self.socket_path):
-            raise Exception("socket %s already exists" % self.socket_path)
+            raise exception.NBDException(
+                "socket %s already exists" % self.socket_path)
 
         qemu_cmd = [
             "qemu-nbd", "-k", self.socket_path,
@@ -343,13 +370,14 @@ class ImageReader(object):
                 "-x", self.export_name, self.image_path
             ]
 
-        LOG.info("Running command: %s" % ' '.join(qemu_cmd))
+        LOG.debug("Running command: %s" % ' '.join(qemu_cmd))
         self._qemu_process = subprocess.Popen(qemu_cmd)
         self._wait_for_socket(self._qemu_process, self.socket_path)
 
         self._nbd_client = NBDClient(
             unix_socket=self.socket_path,
             export_name=self.export_name)
+        self._nbd_client.connect()
 
     def close(self):
         if self._nbd_client:
@@ -367,9 +395,11 @@ class ImageReader(object):
         except BaseException:
             pass
 
+    @utils.retry_on_error(terminal_exceptions=[
+        exception.NBDException, exception.NBDConnectionException])
     def read(self, offset, length):
         if self._nbd_client is None:
-            raise Exception("not initialized properly")
+            raise exception.NBDException("not initialized properly")
         return self._nbd_client.read(offset, length)
 
     def __enter__(self):

+ 4 - 4
coriolis/tasks/factory.py

@@ -14,12 +14,12 @@ _TASKS_MAP = {
         migration_tasks.ImportInstanceTask,
     constants.TASK_TYPE_FINALIZE_IMPORT_INSTANCE:
         migration_tasks.FinalizeImportInstanceTask,
-    constants.TASK_TYPE_DEPLOY_DISK_COPY_WORKER:
-        migration_tasks.DeployDiskCopyWorker,
+    constants.TASK_TYPE_DEPLOY_DISK_COPY_RESOURCES:
+        migration_tasks.DeployDiskCopyResources,
     constants.TASK_TYPE_COPY_DISK_DATA:
         migration_tasks.CopyDiskData,
-    constants.TASK_TYPE_DELETE_DISK_COPY_WORKER:
-        migration_tasks.DeleteDiskCopyWorker,
+    constants.TASK_TYPE_DELETE_DISK_COPY_RESOURCES:
+        migration_tasks.DeleteDiskCopyResources,
     constants.TASK_TYPE_CLEANUP_FAILED_IMPORT_INSTANCE:
         migration_tasks.CleanupFailedImportInstanceTask,
     constants.TASK_TYPE_DEPLOY_OS_MORPHING_RESOURCES:

+ 8 - 8
coriolis/tasks/migration_tasks.py

@@ -52,14 +52,14 @@ class ImportInstanceTask(base.TaskRunner):
         task_info["origin_provider_type"] = constants.PROVIDER_TYPE_EXPORT
         task_info["destination_provider_type"] = constants.PROVIDER_TYPE_IMPORT
         # We need to retain export info until after disk sync
-        # TODO: remove this when we implement multi-worker, and by extension
-        #       some external storage for needed resources (like swift)
+        # TODO (gsamfira): remove this when we implement multi-worker, and by
+        # extension some external storage for needed resources (like swift)
         task_info["retain_export_path"] = True
 
         return task_info
 
 
-class DeployDiskCopyWorker(base.TaskRunner):
+class DeployDiskCopyResources(base.TaskRunner):
     def run(self, ctxt, instance, origin, destination, task_info,
             event_handler):
         provider = providers_factory.get_provider(
@@ -68,7 +68,7 @@ class DeployDiskCopyWorker(base.TaskRunner):
         target_environment = destination.get("target_environment") or {}
         instance_deployment_info = task_info["instance_deployment_info"]
 
-        resources_info = provider.deploy_disk_copy_worker(
+        resources_info = provider.deploy_disk_copy_resources(
             ctxt, connection_info, target_environment,
             instance_deployment_info)
 
@@ -80,8 +80,8 @@ class DeployDiskCopyWorker(base.TaskRunner):
         task_info["instance_deployment_info"][
             "disk_sync_connection_info"] = conn_info
         # We need to retain export info until after disk sync
-        # TODO: remove this when we implement multi-worker, and by extension
-        #       some external storage for needed resources (like swift)
+        # TODO (gsamfira): remove this when we implement multi-worker, and by
+        # extension some external storage for needed resources (like swift)
         task_info["retain_export_path"] = True
 
         return task_info
@@ -108,7 +108,7 @@ class CopyDiskData(base.TaskRunner):
         return task_info
 
 
-class DeleteDiskCopyWorker(base.TaskRunner):
+class DeleteDiskCopyResources(base.TaskRunner):
     def run(self, ctxt, instance, origin, destination, task_info,
             event_handler):
         provider = providers_factory.get_provider(
@@ -116,7 +116,7 @@ class DeleteDiskCopyWorker(base.TaskRunner):
         connection_info = base.get_connection_info(ctxt, destination)
         instance_deployment_info = task_info.get(
             "instance_deployment_info", {})
-        provider.delete_disk_copy_worker(
+        provider.delete_disk_copy_resources(
             ctxt, connection_info, instance_deployment_info)
 
         if instance_deployment_info.get("disk_sync_connection_info"):