Просмотр исходного кода

Merged in gabriel-samfira/coriolis-core/add-nbd-client (pull request #66)

Add block level disk sync for classic migrations
Gabriel-Adrian Samfira 8 лет назад
Родитель
Сommit
9de0aa6e56

+ 14 - 2
coriolis/conductor/rpc/server.py

@@ -531,10 +531,22 @@ class ConductorServerEndpoint(object):
                 instance, constants.TASK_TYPE_IMPORT_INSTANCE,
                 instance, constants.TASK_TYPE_IMPORT_INSTANCE,
                 execution, depends_on=[task_export.id])
                 execution, depends_on=[task_export.id])
 
 
+            task_deploy_disk_copy_resources = self._create_task(
+                instance, constants.TASK_TYPE_DEPLOY_DISK_COPY_RESOURCES,
+                execution, depends_on=[task_import.id])
+
+            task_copy_disk = self._create_task(
+                instance, constants.TASK_TYPE_COPY_DISK_DATA,
+                execution, depends_on=[task_deploy_disk_copy_resources.id])
+
+            task_delete_disk_copy_resources = self._create_task(
+                instance, constants.TASK_TYPE_DELETE_DISK_COPY_RESOURCES,
+                execution, depends_on=[task_copy_disk.id], on_error=True)
+
             if not skip_os_morphing:
             if not skip_os_morphing:
                 task_deploy_os_morphing_resources = self._create_task(
                 task_deploy_os_morphing_resources = self._create_task(
                     instance, constants.TASK_TYPE_DEPLOY_OS_MORPHING_RESOURCES,
                     instance, constants.TASK_TYPE_DEPLOY_OS_MORPHING_RESOURCES,
-                    execution, depends_on=[task_import.id])
+                    execution, depends_on=[task_delete_disk_copy_resources.id])
 
 
                 task_osmorphing = self._create_task(
                 task_osmorphing = self._create_task(
                     instance, constants.TASK_TYPE_OS_MORPHING,
                     instance, constants.TASK_TYPE_OS_MORPHING,
@@ -548,7 +560,7 @@ class ConductorServerEndpoint(object):
 
 
                 next_task = task_delete_os_morphing_resources
                 next_task = task_delete_os_morphing_resources
             else:
             else:
-                next_task = task_import
+                next_task = task_delete_disk_copy_resources
 
 
             self._create_task(
             self._create_task(
                 instance, constants.TASK_TYPE_FINALIZE_IMPORT_INSTANCE,
                 instance, constants.TASK_TYPE_FINALIZE_IMPORT_INSTANCE,

+ 5 - 0
coriolis/constants.py

@@ -17,6 +17,11 @@ TASK_TYPE_IMPORT_INSTANCE = "IMPORT_INSTANCE"
 TASK_TYPE_FINALIZE_IMPORT_INSTANCE = "FINALIZE_IMPORT_INSTANCE"
 TASK_TYPE_FINALIZE_IMPORT_INSTANCE = "FINALIZE_IMPORT_INSTANCE"
 TASK_TYPE_CLEANUP_FAILED_IMPORT_INSTANCE = "CLEANUP_FAILED_IMPORT_INSTANCE"
 TASK_TYPE_CLEANUP_FAILED_IMPORT_INSTANCE = "CLEANUP_FAILED_IMPORT_INSTANCE"
 
 
+TASK_TYPE_DEPLOY_DISK_COPY_RESOURCES = "DEPLOY_DISK_COPY_RESOURCES"
+TASK_TYPE_COPY_DISK_DATA = "COPY_DISK_DATA"
+TASK_TYPE_DELETE_DISK_COPY_RESOURCES = "DELETE_DISK_COPY_RESOURCES"
+
+
 TASK_TYPE_DEPLOY_OS_MORPHING_RESOURCES = "DEPLOY_OS_MORPHING_RESOURCES"
 TASK_TYPE_DEPLOY_OS_MORPHING_RESOURCES = "DEPLOY_OS_MORPHING_RESOURCES"
 TASK_TYPE_OS_MORPHING = "OS_MORPHING"
 TASK_TYPE_OS_MORPHING = "OS_MORPHING"
 TASK_TYPE_DELETE_OS_MORPHING_RESOURCES = "DELETE_OS_MORPHING_RESOURCES"
 TASK_TYPE_DELETE_OS_MORPHING_RESOURCES = "DELETE_OS_MORPHING_RESOURCES"

+ 8 - 0
coriolis/exception.py

@@ -302,3 +302,11 @@ class ConnectionValidationException(CoriolisException):
 
 
 class SchemaValidationException(CoriolisException):
 class SchemaValidationException(CoriolisException):
     safe = True
     safe = True
+
+
+class NBDConnectionException(Exception):
+    pass
+
+
+class NBDException(Exception):
+    pass

+ 107 - 0
coriolis/migrations/manager.py

@@ -0,0 +1,107 @@
+# Copyright 2017 Cloudbase Solutions Srl
+# All Rights Reserved.
+
+import eventlet
+import gc
+import uuid
+import sys
+
+from oslo_log import log as logging
+from oslo_utils import units
+
+from coriolis import events
+from coriolis import nbd
+from coriolis.providers import backup_writers
+from coriolis import utils
+
+LOG = logging.getLogger(__name__)
+
+
+def _copy_volume(volume, backup_writer, event_manager):
+    disk_id = volume["disk_id"]
+    # for now we assume it is a local file
+    virtual_disk = volume["disk_image_uri"]
+    # just an identifier. We use it to create a socket path
+    # that we pass to qemu-nbd
+    name = str(uuid.uuid4())
+
+    with backup_writer.open("", disk_id) as writer:
+        with nbd.DiskImageReader(virtual_disk, name) as reader:
+            perc_step = event_manager.add_percentage_step(
+                reader.export_size,
+                message_format="Disk copy progress for %s: "
+                               "{:.0f}%%" % disk_id)
+            chunk = 4096
+            offset = 0
+            write_offset = 0
+            buff = b''
+            flush = 10 * units.Mi  # 10 MB
+            export_size = reader.export_size
+            while offset < export_size:
+                readBytes = chunk
+                remaining = export_size - offset
+                remainingDelta = remaining - chunk
+                if remainingDelta <= 0:
+                    readBytes = remaining
+
+                if len(buff) == 0:
+                    write_offset = offset
+
+                data = reader.read(offset, readBytes)
+                offset += readBytes
+
+                buff += data
+                if len(buff) >= flush or export_size == offset:
+                    writer.seek(write_offset)
+                    writer.write(buff)
+                    buff = b''
+                    event_manager.set_percentage_step(
+                        perc_step, offset)
+            buff = None
+            data = None
+            gc.collect()
+
+
+def _copy_wrapper(job_args):
+    disk_id = job_args[0].get("disk_id")
+    try:
+        return _copy_volume(*job_args), disk_id, False
+    except BaseException:
+        return sys.exc_info(), disk_id, True
+
+
+def copy_disk_data(target_conn_info, volumes_info, event_handler):
+    # TODO (gsamfira): the disk image should be an URI that can either be local
+    # (file://) or remote (https://, ftp://, smb://, nbd:// etc).
+    # This must happen if we are to implement multi-worker scenarios.
+    # In such cases, it is not guaranteed that the disk sync task
+    # will be started on the same node onto which the import
+    # happened. It may also be conceivable, that wherever the disk
+    # image ends up, we might be able to directly expose it using
+    # NBD, iSCSI or any other network protocol. In which case,
+    # we can skip downloading it locally just to sync it.
+
+    event_manager = events.EventManager(event_handler)
+
+    ip = target_conn_info["ip"]
+    port = target_conn_info.get("port", 22)
+    username = target_conn_info["username"]
+    pkey = target_conn_info.get("pkey")
+    password = target_conn_info.get("password")
+    event_manager.progress_update("Waiting for connectivity on %s:%s" % (
+        ip, port))
+    utils.wait_for_port_connectivity(ip, port)
+    backup_writer = backup_writers.SSHBackupWriter(
+        ip, port, username, pkey, password, volumes_info)
+
+    pool = eventlet.greenpool.GreenPool()
+    job_data = [(vol, backup_writer, event_manager) for vol in volumes_info]
+    for result, disk_id, error in pool.imap(_copy_wrapper, job_data):
+        # TODO (gsamfira): There is no use in letting the other disks finish
+        # sync-ing as we don't save the state of the disk sync anywhere (yet).
+        # When/If we ever do add this info to the database, keep track of
+        # failures, and allow any other paralel sync to finish
+        if error:
+            event_manager.progress_update(
+                "Volume \"%s\" failed to sync" % disk_id)
+            raise result[0](result[1]).with_traceback(result[2])

+ 410 - 0
coriolis/nbd.py

@@ -0,0 +1,410 @@
+"""
+There is currently no authentication support, nor does this implement
+TLS in any way. It is a really basic implementation designed to be used
+to stream blocks from images to coriolis workers.
+
+TODO (gsamfira): implement authentication. Looks like qemu-nbd supports
+something of the sort
+TODO (gsamfira): implement TLS support
+
+With the above 2 implemented, all sorts of fun things could potentially
+be done.
+
+Protocol documentation at:
+
+https://sourceforge.net/p/nbd/code/ci/e6b56c12f8a18e7a7cc253c73d1f63c2cbc41e1b/tree/doc/proto.md
+"""
+
+import struct
+import socket
+import subprocess
+import os
+import netaddr
+import time
+
+from coriolis import exception
+from coriolis import utils
+from oslo_log import log as logging
+
+LOG = logging.getLogger(__name__)
+
+
+NBD_CMD_READ = 0
+# Not really needed, we only care about read
+NBD_CMD_WRITE = 1
+NBD_CMD_DISC = 2
+NBD_CMD_FLUSH = 3
+NBD_CMD_TRIM = 4
+NBD_CMD_WRITE_ZEROES = 6
+NBD_CMD_BLOCK_STATUS = 7
+NBD_CMD_RESIZE = 8
+
+NBD_INIT_PASSWD = b'NBDMAGIC'
+
+# Option types
+# Client wants to select an export name. After setting this
+# option, we move directly to transfer phase
+NBD_OPT_EXPORT_NAME = 1
+# Abort negotiation and terminate session
+NBD_OPT_ABORT = 2
+# return a list of exports
+NBD_OPT_LIST = 3
+# not in use
+NBD_OPT_PEEK_EXPORT = 4
+# client wants to initiate TLS
+NBD_OPT_STARTTLS = 5
+# Get more detailed info about an export
+NBD_OPT_INFO = 6
+# Client wishes to terminate the handshake phase
+# and move to transmission phase.
+NBD_OPT_GO = 7
+
+
+# Option reply types
+# Sent by server when it accepts the option,
+# and no further data is available
+NBD_REP_ACK = 1
+# A description of an export
+NBD_REP_SERVER = 2
+# detailed description of an aspect of an export
+NBD_REP_INFO = 3
+
+# There are a number of error reply types, all of which are denoted by
+# having bit 31 set. All error replies MAY have some data set, in which
+# case that data is an error message string suitable for display to the user.
+NBD_REP_ERR_UNSUP = 2147483649
+NBD_REP_ERR_POLICY = 2147483650
+NBD_REP_ERR_INVALID = 2147483651
+NBD_REP_ERR_PLATFORM = 2147483652
+NBD_REP_ERR_TLS_REQD = 2147483653
+NBD_REP_ERR_UNKNOWN = 2147483654
+NBD_REP_ERR_SHUTDOWN = 2147483655
+NBD_REP_ERR_BLOCK_SIZE_REQD = 2147483656
+
+# Error values
+EPERM = 1
+EIO = 5
+ENOMEM = 12
+EINVAL = 22
+ENOSPC = 28
+EOVERFLOW = 75
+ESHUTDOWN = 108
+
+# Transmission flags
+NBD_FLAG_HAS_FLAGS = 1 << 0
+NBD_FLAG_READ_ONLY = 1 << 1
+NBD_FLAG_SEND_FLUSH = 1 << 2
+NBD_FLAG_SEND_FUA = 1 << 3
+NBD_FLAG_ROTATIONAL = 1 << 4
+NBD_FLAG_SEND_TRIM = 1 << 5
+NBD_FLAG_SEND_WRITE_ZEROES = 1 << 6
+NBD_FLAG_SEND_DF = 1 << 7
+NBD_FLAG_CAN_MULTI_CONN = 1 << 8
+NBD_FLAG_SEND_BLOCK_STATUS = 1 << 9
+NBD_FLAG_SEND_RESIZE = 1 << 10
+
+# New style server that supports extending
+NBD_FLAG_C_FIXED_NEWSTYLE = 1 << 0
+# Do not send the 128 bytes of empty zeroes
+NBD_FLAG_NO_ZEROES = 1 << 1
+
+NBD_OPTS_MAGIC = 0x49484156454F5054
+NBD_SERVER_REPLY_MAGIC = 0x3e889045565a9
+NBD_CLISERV_MAGIC = 0x420281861253
+NBD_REQUEST_MAGIC = 0x25609513
+NBD_REPLY_MAGIC = 0x67446698
+
+
+class NBDClient(object):
+    """
+    Really basic, READ-ONLY NBD client implementation. Only useful
+    for consuming chunks of an export, or the entire thing.
+
+    WARNING: Do not try to do parallel reads using this class. It will
+    most likely result in garbage data, due to the fact that
+    handles are not properly implemented. That whole song and
+    dance requires more complex code. Sequential reads only
+    at this point please.
+    """
+    def __init__(self, host=None, port=None,
+                 unix_socket=None, export_name=None):
+        self._client_flags = NBD_FLAG_C_FIXED_NEWSTYLE
+        self.export_size = None
+        self.export_name = export_name
+        self._handle = b'1'
+        self._host = host
+        self._port = port
+        self._unix_socket = unix_socket
+        self._export_name = export_name
+        self.sock = None
+
+    def _select_export(self, sock, name):
+        if type(name) is str:
+            name = bytes(name.encode("ascii"))
+        magic = struct.pack('>Q', NBD_OPTS_MAGIC)
+        opt = struct.pack('>L', NBD_OPT_EXPORT_NAME)
+        name_size = struct.pack('>L', len(name))
+
+        payload = magic + opt + name_size + name
+        sock.sendall(payload)
+        response = sock.recv(64)
+        if len(response) == 0:
+            raise exception.NBDException(
+                "Read failed. Likely export name is wrong")
+        decoded = struct.unpack('>QH', response)
+        return decoded[0]
+
+    def _negotiate(self, sock, name=None):
+        # fetch the init password. If this is invalid, either the
+        # server erred or we are trying to start a negotiation a socket
+        # that is already in transmission phase
+        passwdSize = struct.calcsize('>8s')
+        passwd = struct.unpack('>8s', sock.recv(passwdSize))
+        if passwd[0] != NBD_INIT_PASSWD:
+            raise exception.NBDException("Bad NBD passwd: %r. Expected: %r" % (
+                passwd[0], NBD_INIT_PASSWD))
+
+        magicSize = struct.calcsize('>Q')
+        magic = struct.unpack('>Q', sock.recv(magicSize))
+        if magic[0] == int(NBD_CLISERV_MAGIC):
+            # Old style negotiation is not really a negotiation. It's more
+            # like the server saying: "here you go do whatever". Not unlike
+            # a school canteen lunch lady would do when you humbly
+            # (but naively) ask for something edible.
+            LOG.info("Using old style negotiation for %s" % self.export_name)
+            info = struct.unpack('>Q128s', sock.recv(
+                struct.calcsize('>Q128s')))
+            self.export_size = info[0]
+        else:
+            # Looks like we're using new style negotiation.
+            # Export name is required in this situation
+            if name is None:
+                raise ValueError("export name is required for"
+                                 "new style negotiation")
+            # Check that we're using the FIXED_NEWSTYLE
+            # Flags are an unsigned short
+            flags = struct.unpack('>H', sock.recv(struct.calcsize('>H')))
+            needed = flags[0] & NBD_FLAG_C_FIXED_NEWSTYLE
+            if needed != NBD_FLAG_C_FIXED_NEWSTYLE:
+                raise exception.NBDException(
+                    "Server does not support export listing")
+            if flags[0] & NBD_FLAG_NO_ZEROES:
+                self._client_flags |= NBD_FLAG_NO_ZEROES
+            # Send client flags
+            client_flags = struct.pack('>L', self._client_flags)
+            sock.send(client_flags)
+            self.export_size = self._select_export(sock, name)
+
+    def connect(self, host=None, port=None,
+                unix_socket=None, export_name=None):
+        # WARNING: there is no TLS support. Make sure you only use this
+        # for local connections, or in secure environments
+
+        _host = host or self._host
+        _port = port or self._port
+        _unix_socket = unix_socket or self._unix_socket
+        _export_name = export_name or self._export_name
+
+        if self.sock:
+            # we are reconnectiong. Clean up after ourselves
+            self.close()
+
+        sock = None
+        addr = None
+        if _unix_socket is not None:
+            # no need to do extra checks, socket will raise
+            # if the supplied path does not exist
+            sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+            addr = _unix_socket
+
+        if None not in (_host, _port):
+            ipVersion = netaddr.IPNetwork(_host).version
+            inet = socket.AF_INET
+            if ipVersion == 6:
+                inet = socket.AF_INET6
+            sock = socket.socket(inet, socket.SOCK_STREAM)
+            addr = (_host, _port)
+
+        if sock is None:
+            raise exception.NBDException(
+                "either host/port or socket needs to be set")
+
+        try:
+            sock.connect(addr)
+        except socket.error as err:
+            if err.errno == 106:
+                # already connected, just return
+                # NOTE (gsamfira): this assumes that negotiation
+                # has already happened
+                return sock
+            raise
+        self._negotiate(sock, name=_export_name)
+        self.sock = sock
+
+        self._host = _host
+        self._port = _port
+        self._unix_socket = _unix_socket
+        self._export_name = _export_name
+        return self.sock
+
+    def close(self):
+        if self.sock is None:
+            return
+        request = struct.pack(
+            '>LL8sQL',
+            NBD_REQUEST_MAGIC,
+            NBD_CMD_DISC,
+            self._handle,
+            0,
+            0)
+        self.sock.send(request)
+        self.sock.close()
+        self.sock = None
+        self.export_size = None
+
+    def read(self, offset, length):
+        if self.sock is None:
+            raise exception.NBDConnectionException(
+                "Socket is not connected")
+        if offset > self.export_size:
+            raise ValueError("Offset is outside the size of export")
+        readEnd = offset + length
+        if readEnd > self.export_size:
+            length = self.export_size - offset
+        request = struct.pack(
+            '>LL8sQL',
+            NBD_REQUEST_MAGIC,
+            NBD_CMD_READ,
+            # NOTE (gsamfira): this function is not safe for
+            # concurrent reads! Must not run read() in parallel.
+            # TODO (gsamfira): Implement concurrency. Needs to treat
+            # handles appropriately. Responses are treated
+            # asynchronously  and may come out of order
+            self._handle,
+            offset,
+            length)
+
+        self.sock.send(request)
+        responseSize = struct.calcsize('>LL8s')
+        response = self.sock.recv(responseSize)
+        magic, error, handle = struct.unpack('>LL8s', response)
+        if magic != int(NBD_REPLY_MAGIC):
+            raise exception.NBDException(
+                "Got invalid magic from "
+                "server: %r" % magic)
+        if error != 0:
+            # TODO (gsamfira): translate error codes to messages
+            raise exception.NBDException(
+                "Got invalid response from "
+                "server: %r" % error)
+        got = b''
+        while len(got) < length:
+            more = self.sock.recv(length - len(got))
+            if more == "":
+                raise exception.NBDException(length)
+            got += more
+        return got
+
+
+class DiskImageReader(object):
+
+    def __init__(self, path, name):
+        """
+        param: path: str: The path to the virtual disk image you want to read
+        param: name: str: The name of the export
+        """
+        self.image_path = path
+        self.export_name = name
+        self.socket_path = "/tmp/%s.sock" % self.export_name
+        self._nbd_client = None
+        self._qemu_process = None
+
+    @property
+    def export_size(self):
+        if self._nbd_client is not None:
+            return self._nbd_client.export_size
+        return None
+
+    def _wait_for_socket(self, process, socket_path):
+        count = 0
+        while True:
+            # arbitrary. wait 5 seconds
+            if count >= 50:
+                raise TimeoutError("timed out waiting for"
+                                   " socket: %s" % socket_path)
+            status = process.poll()
+            if status:
+                stdout, stderr = process.communicate()
+                raise exception.NBDException(
+                    "process failed with status: %r" % status)
+            if os.path.exists(socket_path):
+                return socket_path
+            time.sleep(0.1)
+
+    def _supports_newstyle(self):
+        for i in subprocess.check_output(["qemu-nbd", "-h"]).splitlines():
+            if b'--export-name' in i:
+                return True
+        return False
+
+    def connect(self):
+        if os.path.isfile(self.image_path) is False:
+            raise ValueError("Image file %s does not exist" % self.image_path)
+        if self._qemu_process is not None:
+            raise exception.NBDException("qemu-nbd is already running")
+        if self._nbd_client is not None:
+            raise exception.NBDException("client already created")
+
+        if os.path.exists(self.socket_path):
+            raise exception.NBDException(
+                "socket %s already exists" % self.socket_path)
+
+        qemu_cmd = [
+            "qemu-nbd", "-k", self.socket_path,
+            self.image_path
+        ]
+
+        if self._supports_newstyle():
+            qemu_cmd = [
+                "qemu-nbd", "-k", self.socket_path,
+                "-x", self.export_name, self.image_path
+            ]
+
+        LOG.debug("Running command: %s" % ' '.join(qemu_cmd))
+        self._qemu_process = subprocess.Popen(qemu_cmd)
+        self._wait_for_socket(self._qemu_process, self.socket_path)
+
+        self._nbd_client = NBDClient(
+            unix_socket=self.socket_path,
+            export_name=self.export_name)
+        self._nbd_client.connect()
+
+    def close(self):
+        if self._nbd_client:
+            self._nbd_client.close()
+            self._nbd_client = None
+        if self._qemu_process:
+            # the qemu-nbd binary should already have exited on
+            # self._nbd_client.close(), but we kill it for good
+            # measure.
+            self._qemu_process.kill()
+            self._qemu_process.wait()
+            self._qemu_process = None
+        try:
+            os.remove(self.socket_path)
+        except BaseException:
+            pass
+
+    @utils.retry_on_error(terminal_exceptions=[
+        exception.NBDException, exception.NBDConnectionException])
+    def read(self, offset, length):
+        if self._nbd_client is None:
+            raise exception.NBDException("not initialized properly")
+        return self._nbd_client.read(offset, length)
+
+    def __enter__(self):
+        self.connect()
+        return self
+
+    def __exit__(self, *args):
+        self.close()

+ 7 - 1
coriolis/providers/backup_writers.py

@@ -88,7 +88,13 @@ class SSHBackupWriter(BaseBackupWriter):
         except Exception as ex:
         except Exception as ex:
             LOG.exception(ex)
             LOG.exception(ex)
 
 
-            ret_val = self._stdout.channel.recv_exit_status()
+            ret_val = None
+            # if the application is still running on the other side,
+            # recv_exit_status() will block. Check that we have an
+            # exit status before retrieving it
+            if self._stdout.channel.exit_status_ready():
+                ret_val = self._stdout.channel.recv_exit_status()
+
             self._ssh.close()
             self._ssh.close()
 
 
             if ret_val:
             if ret_val:

+ 10 - 0
coriolis/providers/base.py

@@ -102,6 +102,16 @@ class BaseImportProvider(BaseImportInstanceProvider):
         """
         """
         pass
         pass
 
 
+    @abc.abstractmethod
+    def deploy_disk_copy_resources(self, ctxt, connection_info,
+                                   target_environment, volumes_info):
+        pass
+
+    @abc.abstractmethod
+    def delete_disk_copy_resources(self, ctxt, connection_info,
+                                   target_resources_dict):
+        pass
+
     @abc.abstractmethod
     @abc.abstractmethod
     def finalize_import_instance(self, ctxt, connection_info,
     def finalize_import_instance(self, ctxt, connection_info,
                                  instance_deployment_info):
                                  instance_deployment_info):

+ 6 - 0
coriolis/tasks/factory.py

@@ -14,6 +14,12 @@ _TASKS_MAP = {
         migration_tasks.ImportInstanceTask,
         migration_tasks.ImportInstanceTask,
     constants.TASK_TYPE_FINALIZE_IMPORT_INSTANCE:
     constants.TASK_TYPE_FINALIZE_IMPORT_INSTANCE:
         migration_tasks.FinalizeImportInstanceTask,
         migration_tasks.FinalizeImportInstanceTask,
+    constants.TASK_TYPE_DEPLOY_DISK_COPY_RESOURCES:
+        migration_tasks.DeployDiskCopyResources,
+    constants.TASK_TYPE_COPY_DISK_DATA:
+        migration_tasks.CopyDiskData,
+    constants.TASK_TYPE_DELETE_DISK_COPY_RESOURCES:
+        migration_tasks.DeleteDiskCopyResources,
     constants.TASK_TYPE_CLEANUP_FAILED_IMPORT_INSTANCE:
     constants.TASK_TYPE_CLEANUP_FAILED_IMPORT_INSTANCE:
         migration_tasks.CleanupFailedImportInstanceTask,
         migration_tasks.CleanupFailedImportInstanceTask,
     constants.TASK_TYPE_DEPLOY_OS_MORPHING_RESOURCES:
     constants.TASK_TYPE_DEPLOY_OS_MORPHING_RESOURCES:

+ 74 - 0
coriolis/tasks/migration_tasks.py

@@ -4,6 +4,8 @@
 from coriolis import constants
 from coriolis import constants
 from coriolis.providers import factory as providers_factory
 from coriolis.providers import factory as providers_factory
 from coriolis import schemas
 from coriolis import schemas
+from coriolis import exception
+from coriolis.migrations import manager
 from coriolis.tasks import base
 from coriolis.tasks import base
 
 
 from oslo_log import log as logging
 from oslo_log import log as logging
@@ -49,6 +51,78 @@ class ImportInstanceTask(base.TaskRunner):
 
 
         task_info["origin_provider_type"] = constants.PROVIDER_TYPE_EXPORT
         task_info["origin_provider_type"] = constants.PROVIDER_TYPE_EXPORT
         task_info["destination_provider_type"] = constants.PROVIDER_TYPE_IMPORT
         task_info["destination_provider_type"] = constants.PROVIDER_TYPE_IMPORT
+        # We need to retain export info until after disk sync
+        # TODO (gsamfira): remove this when we implement multi-worker, and by
+        # extension some external storage for needed resources (like swift)
+        task_info["retain_export_path"] = True
+
+        return task_info
+
+
+class DeployDiskCopyResources(base.TaskRunner):
+    def run(self, ctxt, instance, origin, destination, task_info,
+            event_handler):
+        provider = providers_factory.get_provider(
+            destination["type"], constants.PROVIDER_TYPE_IMPORT, event_handler)
+        connection_info = base.get_connection_info(ctxt, destination)
+        target_environment = destination.get("target_environment") or {}
+        instance_deployment_info = task_info["instance_deployment_info"]
+
+        resources_info = provider.deploy_disk_copy_resources(
+            ctxt, connection_info, target_environment,
+            instance_deployment_info)
+
+        conn_info = resources_info[
+            "instance_deployment_info"]["disk_sync_connection_info"]
+        conn_info = base.marshal_migr_conn_info(conn_info)
+        task_info["instance_deployment_info"] = resources_info[
+            "instance_deployment_info"]
+        task_info["instance_deployment_info"][
+            "disk_sync_connection_info"] = conn_info
+        # We need to retain export info until after disk sync
+        # TODO (gsamfira): remove this when we implement multi-worker, and by
+        # extension some external storage for needed resources (like swift)
+        task_info["retain_export_path"] = True
+
+        return task_info
+
+
+class CopyDiskData(base.TaskRunner):
+    def run(self, ctxt, instance, origin, destination, task_info,
+            event_handler):
+        instance_deployment_info = task_info["instance_deployment_info"]
+        volumes_info = instance_deployment_info["volumes_info"]
+        LOG.info("Volumes info is: %r" % volumes_info)
+
+        image_paths = [i.get("disk_image_uri") for i in volumes_info]
+        if None in image_paths:
+            raise exception.InvalidActionTasksExecutionState(
+                "disk_image_uri must be part of volumes_info for"
+                " standard migrations")
+
+        target_conn_info = base.unmarshal_migr_conn_info(
+            instance_deployment_info["disk_sync_connection_info"])
+        manager.copy_disk_data(
+            target_conn_info, volumes_info, event_handler)
+
+        return task_info
+
+
+class DeleteDiskCopyResources(base.TaskRunner):
+    def run(self, ctxt, instance, origin, destination, task_info,
+            event_handler):
+        provider = providers_factory.get_provider(
+            destination["type"], constants.PROVIDER_TYPE_IMPORT, event_handler)
+        connection_info = base.get_connection_info(ctxt, destination)
+        instance_deployment_info = task_info.get(
+            "instance_deployment_info", {})
+        provider.delete_disk_copy_resources(
+            ctxt, connection_info, instance_deployment_info)
+
+        if instance_deployment_info.get("disk_sync_connection_info"):
+            del instance_deployment_info["disk_sync_connection_info"]
+        if instance_deployment_info.get("disk_sync_tgt_resources"):
+            del instance_deployment_info["disk_sync_tgt_resources"]
 
 
         return task_info
         return task_info