Gabriel Adrian Samfira 8 лет назад
Родитель
Сommit
00c484b284

+ 14 - 2
coriolis/conductor/rpc/server.py

@@ -531,10 +531,22 @@ class ConductorServerEndpoint(object):
                 instance, constants.TASK_TYPE_IMPORT_INSTANCE,
                 execution, depends_on=[task_export.id])
 
+            task_deploy_disk_copy_worker = self._create_task(
+                instance, constants.TASK_TYPE_DEPLOY_DISK_COPY_WORKER,
+                execution, depends_on=[task_import.id])
+
+            task_copy_disk = self._create_task(
+                instance, constants.TASK_TYPE_COPY_DISK_DATA,
+                execution, depends_on=[task_deploy_disk_copy_worker.id])
+
+            task_delete_disk_copy_worker = self._create_task(
+                instance, constants.TASK_TYPE_DELETE_DISK_COPY_WORKER,
+                execution, depends_on=[task_copy_disk.id], on_error=True)
+
             if not skip_os_morphing:
                 task_deploy_os_morphing_resources = self._create_task(
                     instance, constants.TASK_TYPE_DEPLOY_OS_MORPHING_RESOURCES,
-                    execution, depends_on=[task_import.id])
+                    execution, depends_on=[task_delete_disk_copy_worker.id])
 
                 task_osmorphing = self._create_task(
                     instance, constants.TASK_TYPE_OS_MORPHING,
@@ -548,7 +560,7 @@ class ConductorServerEndpoint(object):
 
                 next_task = task_delete_os_morphing_resources
             else:
-                next_task = task_import
+                next_task = task_delete_disk_copy_worker
 
             self._create_task(
                 instance, constants.TASK_TYPE_FINALIZE_IMPORT_INSTANCE,

+ 5 - 0
coriolis/constants.py

@@ -17,6 +17,11 @@ TASK_TYPE_IMPORT_INSTANCE = "IMPORT_INSTANCE"
 TASK_TYPE_FINALIZE_IMPORT_INSTANCE = "FINALIZE_IMPORT_INSTANCE"
 TASK_TYPE_CLEANUP_FAILED_IMPORT_INSTANCE = "CLEANUP_FAILED_IMPORT_INSTANCE"
 
+TASK_TYPE_DEPLOY_DISK_COPY_WORKER = "DEPLOY_DISK_COPY_WORKER"
+TASK_TYPE_COPY_DISK_DATA = "COPY_DISK_DATA"
+TASK_TYPE_DELETE_DISK_COPY_WORKER = "DELETE_DISK_COPY_WORKER"
+
+
 TASK_TYPE_DEPLOY_OS_MORPHING_RESOURCES = "DEPLOY_OS_MORPHING_RESOURCES"
 TASK_TYPE_OS_MORPHING = "OS_MORPHING"
 TASK_TYPE_DELETE_OS_MORPHING_RESOURCES = "DELETE_OS_MORPHING_RESOURCES"

+ 107 - 0
coriolis/migrations/manager.py

@@ -0,0 +1,107 @@
+# Copyright 2017 Cloudbase Solutions Srl
+# All Rights Reserved.
+
+import gc
+import uuid
+
+from oslo_log import log as logging
+
+from coriolis import events
+from coriolis import nbd
+from coriolis.providers import backup_writers
+from coriolis import utils
+
+LOG = logging.getLogger(__name__)
+
+
+def copy_disk_data(target_conn_info, volumes_info, event_handler):
+    # TODO: the disk image should be an URI that can either be local
+    #       (file://) or remote (https://, ftp://, smb://, nbd:// etc).
+    #       This must happen if we are to implement multi-worker scenarios.
+    #       In such cases, it is not guaranteed that the disk sync task
+    #       will be started on the same node onto which the import
+    #       happened. It may also be conceivable, that wherever the disk
+    #       image ends up, we might be able to directly expose it using
+    #       NBD, iSCSI or any other network protocol. In which case,
+    #       we can skip downloading it locally just to sync it.
+
+    event_manager = events.EventManager(event_handler)
+
+    ip = target_conn_info["ip"]
+    port = target_conn_info.get("port", 22)
+    username = target_conn_info["username"]
+    pkey = target_conn_info.get("pkey")
+    password = target_conn_info.get("password")
+    utils.wait_for_port_connectivity(ip, port)
+    backup_writer = backup_writers.SSHBackupWriter(
+        ip, port, username, pkey, password, volumes_info)
+
+    chunk = 4096
+
+    for volume in volumes_info:
+        disk_id = volume["disk_id"]
+        # for now we assume it is a local file
+        virtual_disk = volume["disk_image_uri"]
+        # just an identifier. We use it to create a socket path
+        # that we pass to qemu-nbd
+        vol_name = volume.get("volume_name") or str(uuid.uuid4())
+
+        with backup_writer.open("", disk_id) as writer:
+            with nbd.ImageReader(virtual_disk, vol_name) as reader:
+                perc_step = event_manager.add_percentage_step(
+                    reader.export_size,
+                    message_format="Disk copy progress for %s: "
+                                   "{:.0f}%%" % vol_name)
+
+                offset = 0
+                write_offset = 0
+                buff = b''
+                flush = 10 * 1024 * 1024  # 10 MB
+                emptyChunk = b'\0' * chunk
+                written = 0
+                export_size = reader.export_size
+                while offset < export_size:
+                    readBytes = chunk
+                    remaining = export_size - offset
+                    remainingDelta = remaining - chunk
+                    if remainingDelta <= 0:
+                        readBytes = remaining
+                        emptyChunk = emptyChunk[0:readBytes]
+
+                    data = reader.read(offset, readBytes)
+                    if data == emptyChunk:
+                        # got a whole lot of nothing. Not putting
+                        # it on the wire
+                        if len(buff) > 0:
+                            LOG.info("Found empty chunk. Flushing buffer:"
+                                     " %r starting at offset: %r" % (
+                                         len(buff), write_offset))
+                            writer.seek(write_offset)
+                            writer.write(buff)
+                            written += len(buff)
+                            event_manager.set_percentage_step(
+                                perc_step, offset)
+                            buff = b''
+                    else:
+                        if len(buff) == 0:
+                            LOG.info("Found written chunk at %r."
+                                     " Last written offset: %r" % (
+                                         offset, write_offset))
+                            write_offset = offset
+                        buff += data
+                        if len(buff) >= flush or export_size == offset:
+                            writer.seek(write_offset)
+                            writer.write(buff)
+                            written += len(buff)
+                            buff = b''
+                    offset += readBytes
+                    if offset % flush == 0 or export_size == offset:
+                        event_manager.set_percentage_step(
+                            perc_step, offset)
+                event_manager.progress_update(
+                    "Total bytes sent over wire: %r MB" % (
+                        written / 1024 / 1024))
+                emptyChunk = None
+                buff = None
+                data = None
+                gc.collect()

+ 380 - 0
coriolis/nbd.py

@@ -0,0 +1,380 @@
+"""
+There is currently no authentication support, nor does this implement
+TLS in any way. It is a really basic implementation designed to be used
+to stream blocks from images to coriolis workers.
+
+TODO: implement authentication. Looks like qemu-nbd supports something of
+      the sort
+TODO: implement TLS support
+
+With the above 2 implemented, all sorts of fun things could potentially
+be done.
+
+Protocol documentation at:
+
+https://sourceforge.net/p/nbd/code/ci/e6b56c12f8a18e7a7cc253c73d1f63c2cbc41e1b/tree/doc/proto.md
+"""
+
+import struct
+import socket
+import subprocess
+import os
+import netaddr
+import time
+
+from oslo_log import log as logging
+
+LOG = logging.getLogger(__name__)
+
+
+NBD_CMD_READ = 0
+# Not really needed, we only care about read
+NBD_CMD_WRITE = 1
+NBD_CMD_DISC = 2
+NBD_CMD_FLUSH = 3
+NBD_CMD_TRIM = 4
+NBD_CMD_WRITE_ZEROES = 6
+NBD_CMD_BLOCK_STATUS = 7
+NBD_CMD_RESIZE = 8
+
+NBD_INIT_PASSWD = b'NBDMAGIC'
+
+# Option types
+# Client wants to select an export name. After setting this
+# option, we move directly to transfer phase
+NBD_OPT_EXPORT_NAME = 1
+# Abort negotiation and terminate session
+NBD_OPT_ABORT = 2
+# return a list of exports
+NBD_OPT_LIST = 3
+# not in use
+NBD_OPT_PEEK_EXPORT = 4
+# client wants to initiate TLS
+NBD_OPT_STARTTLS = 5
+# Get more detailed info about an export
+NBD_OPT_INFO = 6
+# Client wishes to terminate the handshake phase
+# and move to transmission phase.
+NBD_OPT_GO = 7
+
+
+# Option reply types
+# Sent by server when it accepts the option,
+# and no further data is available
+NBD_REP_ACK = 1
+# A description of an export
+NBD_REP_SERVER = 2
+# detailed description of an aspect of an export
+NBD_REP_INFO = 3
+
+# There are a number of error reply types, all of which are denoted by
+# having bit 31 set. All error replies MAY have some data set, in which
+# case that data is an error message string suitable for display to the user.
+NBD_REP_ERR_UNSUP = 2147483649
+NBD_REP_ERR_POLICY = 2147483650
+NBD_REP_ERR_INVALID = 2147483651
+NBD_REP_ERR_PLATFORM = 2147483652
+NBD_REP_ERR_TLS_REQD = 2147483653
+NBD_REP_ERR_UNKNOWN = 2147483654
+NBD_REP_ERR_SHUTDOWN = 2147483655
+NBD_REP_ERR_BLOCK_SIZE_REQD = 2147483656
+
+# Error values
+EPERM = 1
+EIO = 5
+ENOMEM = 12
+EINVAL = 22
+ENOSPC = 28
+EOVERFLOW = 75
+ESHUTDOWN = 108
+
+# Transmission flags
+NBD_FLAG_HAS_FLAGS = 1 << 0
+NBD_FLAG_READ_ONLY = 1 << 1
+NBD_FLAG_SEND_FLUSH = 1 << 2
+NBD_FLAG_SEND_FUA = 1 << 3
+NBD_FLAG_ROTATIONAL = 1 << 4
+NBD_FLAG_SEND_TRIM = 1 << 5
+NBD_FLAG_SEND_WRITE_ZEROES = 1 << 6
+NBD_FLAG_SEND_DF = 1 << 7
+NBD_FLAG_CAN_MULTI_CONN = 1 << 8
+NBD_FLAG_SEND_BLOCK_STATUS = 1 << 9
+NBD_FLAG_SEND_RESIZE = 1 << 10
+
+# New style server that supports extending
+NBD_FLAG_C_FIXED_NEWSTYLE = 1 << 0
+# Do not send the 128 bytes of empty zeroes
+NBD_FLAG_NO_ZEROES = 1 << 1
+
+NBD_OPTS_MAGIC = 0x49484156454F5054
+NBD_SERVER_REPLY_MAGIC = 0x3e889045565a9
+NBD_CLISERV_MAGIC = 0x420281861253
+NBD_REQUEST_MAGIC = 0x25609513
+NBD_REPLY_MAGIC = 0x67446698
+
+
+class NBDClient(object):
+    """
+    Really basic, READ-ONLY NBD client implementation. Only useful
+    for consuming chunks of an export, or the entire thing.
+
+    WARNING: Do not try to do parallel reads using this class. It will
+             most likely result in garbage data, due to the fact that
+             handles are not properly implemented. That whole song and
+             dance requires more complex code. Sequential reads only
+             at this point please.
+    """
+    def __init__(self, host=None, port=None,
+                 unix_socket=None, export_name=None):
+        self._client_flags = NBD_FLAG_C_FIXED_NEWSTYLE
+        self.export_size = None
+        self.export_name = export_name
+        self._handle = b'1'
+        self.sock = self.connect(
+            host=host,
+            port=port,
+            unix_socket=unix_socket,
+            export_name=export_name)
+
+    def _select_export(self, sock, name):
+        if type(name) is str:
+            name = bytes(name.encode("ascii"))
+        magic = struct.pack('>Q', NBD_OPTS_MAGIC)
+        opt = struct.pack('>L', NBD_OPT_EXPORT_NAME)
+        name_size = struct.pack('>L', len(name))
+
+        payload = magic + opt + name_size + name
+        sock.sendall(payload)
+        response = sock.recv(64)
+        if len(response) == 0:
+            raise Exception("Read failed. Likely export name is wrong")
+        decoded = struct.unpack('>QH', response)
+        return decoded[0]
+
+    def _negotiate(self, sock, name=None):
+        # fetch the init password. If this is invalid, either the
+        # server erred or we are trying to start a negotiation a socket
+        # that is already in transmission phase
+        passwdSize = struct.calcsize('>8s')
+        passwd = struct.unpack('>8s', sock.recv(passwdSize))
+        if passwd[0] != NBD_INIT_PASSWD:
+            raise Exception("Bad NBD passwd: %r --> %r" % (
+                passwd[0], NBD_INIT_PASSWD))
+
+        magicSize = struct.calcsize('>Q')
+        magic = struct.unpack('>Q', sock.recv(magicSize))
+        if magic[0] == int(NBD_CLISERV_MAGIC):
+            # Old style negotiation is not really a negotiation. It's more
+            # like the server saying: "here you go do whatever". Not unlike
+            # a school canteen lunch lady would do when you humbly
+            # (but naively) ask for something edible.
+            LOG.info("Using old style negotiation for %s" % self.export_name)
+            info = struct.unpack('>Q128s', sock.recv(
+                struct.calcsize('>Q128s')))
+            self.export_size = info[0]
+        else:
+            # Looks like we're using new style negotiation.
+            # Export name is required in this situation
+            if name is None:
+                raise ValueError("export name is required for"
+                                 "new style negotiation")
+            # Check that we're using the FIXED_NEWSTYLE
+            # Flags are an unsigned short
+            flags = struct.unpack('>H', sock.recv(struct.calcsize('>H')))
+            needed = flags[0] & NBD_FLAG_C_FIXED_NEWSTYLE
+            if needed != NBD_FLAG_C_FIXED_NEWSTYLE:
+                raise Exception("Server does not support export listing")
+            if flags[0] & NBD_FLAG_NO_ZEROES:
+                self._client_flags |= NBD_FLAG_NO_ZEROES
+            # Send client flags
+            client_flags = struct.pack('>L', self._client_flags)
+            sock.send(client_flags)
+            self.export_size = self._select_export(sock, name)
+
+    def connect(self, host=None, port=None,
+                unix_socket=None, export_name=None):
+        # there is no TLS support. Make sure you only use this
+        # for local connections, or in secure environments
+        sock = None
+        addr = None
+        if socket is not None:
+            # no need to do extra checks, socket will raise
+            # if the supplied path does not exist
+            sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+            addr = unix_socket
+
+        if None not in (host, port):
+            ipVersion = netaddr.IPNetwork(host).version
+            inet = socket.AF_INET
+            if ipVersion == 6:
+                inet = socket.AF_INET6
+            sock = socket.socket(inet, socket.SOCK_STREAM)
+            addr = (host, port)
+
+        if sock is None:
+            raise Exception("either host/port or socket needs to be set")
+
+        try:
+            sock.connect(addr)
+        except socket.error as err:
+            if err.errno == 106:
+                # already connected, just return
+                # WARNING: this assumes that negotiation
+                #          has already happened
+                return sock
+            raise
+        self._negotiate(sock, name=export_name)
+        self.sock = sock
+        return self.sock
+
+    def close(self):
+        request = struct.pack(
+            '>LL8sQL',
+            NBD_REQUEST_MAGIC,
+            NBD_CMD_DISC,
+            self._handle,
+            0,
+            0)
+        self.sock.send(request)
+        self.sock.close()
+        self.sock = None
+        self.export_size = None
+
+    def read(self, offset, length):
+        if offset > self.export_size:
+            raise ValueError("Offset is outside the size of export")
+        readEnd = offset + length
+        if readEnd > self.export_size:
+            length = self.export_size - offset
+        request = struct.pack(
+            '>LL8sQL',
+            NBD_REQUEST_MAGIC,
+            NBD_CMD_READ,
+            # WARNING: this function is not safe for
+            #          concurrent reads! Must not run
+            #          read() in parallel.
+            # TODO: Implement concurrency. Needs to treat
+            #       handles appropriately. Responses are treated
+            #       asynchronously  and may come out of order
+            self._handle,
+            offset,
+            length)
+
+        self.sock.send(request)
+        responseSize = struct.calcsize('>LL8s')
+        response = self.sock.recv(responseSize)
+        magic, error, handle = struct.unpack('>LL8s', response)
+        if magic != int(NBD_REPLY_MAGIC):
+            raise ValueError(
+                "Got invalid response from "
+                "server: %r --> %r --> %r" % (magic, error, handle))
+        if error != 0:
+            raise Exception(
+                "Got invalid response from "
+                "server: %r --> %r --> %r" % (magic, error, handle))
+        got = b''
+        while len(got) < length:
+            more = self.sock.recv(length - len(got))
+            if more == "":
+                raise Exception(length)
+            got += more
+        return got
+
+
+class ImageReader(object):
+
+    def __init__(self, path, name):
+        """
+        param: path: str: The path to the virtual disk image you want to read
+        param: name: str: The name of the export
+        """
+        self.image_path = path
+        self.export_name = name
+        self.socket_path = "/tmp/%s.sock" % self.export_name
+        self._nbd_client = None
+        self._qemu_process = None
+
+    @property
+    def export_size(self):
+        if self._nbd_client is not None:
+            return self._nbd_client.export_size
+        return None
+
+    def _wait_for_socket(self, process, socket_path):
+        count = 0
+        while True:
+            # arbitrary. wait 5 seconds
+            if count >= 50:
+                raise TimeoutError("timed out waiting for"
+                                   " socket: %s" % socket_path)
+            status = process.poll()
+            if status:
+                stdout, stderr = process.communicate()
+                raise Exception("process failed with status: %r" % status)
+            if os.path.exists(socket_path):
+                return socket_path
+            time.sleep(0.1)
+
+    def _supports_newstyle(self):
+        for i in subprocess.check_output(["qemu-nbd", "-h"]).splitlines():
+            if b'--export-name' in i:
+                return True
+        return False
+
+    def connect(self):
+        if os.path.isfile(self.image_path) is False:
+            raise ValueError("Image file %s does not exist" % self.image_path)
+        if self._qemu_process is not None:
+            raise Exception("qemu-nbd is already running")
+        if self._nbd_client is not None:
+            raise Exception("client already created")
+
+        if os.path.exists(self.socket_path):
+            raise Exception("socket %s already exists" % self.socket_path)
+
+        qemu_cmd = [
+            "qemu-nbd", "-k", self.socket_path,
+            self.image_path
+        ]
+
+        if self._supports_newstyle():
+            qemu_cmd = [
+                "qemu-nbd", "-k", self.socket_path,
+                "-x", self.export_name, self.image_path
+            ]
+
+        LOG.info("Running command: %s" % ' '.join(qemu_cmd))
+        self._qemu_process = subprocess.Popen(qemu_cmd)
+        self._wait_for_socket(self._qemu_process, self.socket_path)
+
+        self._nbd_client = NBDClient(
+            unix_socket=self.socket_path,
+            export_name=self.export_name)
+
+    def close(self):
+        if self._nbd_client:
+            self._nbd_client.close()
+            self._nbd_client = None
+        if self._qemu_process:
+            # the qemu-nbd binary should already have exited on
+            # self._nbd_client.close(), but we kill it for good
+            # measure.
+            self._qemu_process.kill()
+            self._qemu_process.wait()
+            self._qemu_process = None
+        try:
+            os.remove(self.socket_path)
+        except BaseException:
+            pass
+
+    def read(self, offset, length):
+        if self._nbd_client is None:
+            raise Exception("not initialized properly")
+        return self._nbd_client.read(offset, length)
+
+    def __enter__(self):
+        self.connect()
+        return self
+
+    def __exit__(self, *args):
+        self.close()

+ 10 - 0
coriolis/providers/base.py

@@ -102,6 +102,16 @@ class BaseImportProvider(BaseImportInstanceProvider):
         """
         pass
 
+    @abc.abstractmethod
+    def deploy_disk_copy_worker(self, ctxt, connection_info,
+                                target_environment, volumes_info):
+        pass
+
+    @abc.abstractmethod
+    def delete_disk_copy_worker(self, ctxt, connection_info,
+                                target_resources_dict):
+        pass
+
     @abc.abstractmethod
     def finalize_import_instance(self, ctxt, connection_info,
                                  instance_deployment_info):

+ 6 - 0
coriolis/tasks/factory.py

@@ -14,6 +14,12 @@ _TASKS_MAP = {
         migration_tasks.ImportInstanceTask,
     constants.TASK_TYPE_FINALIZE_IMPORT_INSTANCE:
         migration_tasks.FinalizeImportInstanceTask,
+    constants.TASK_TYPE_DEPLOY_DISK_COPY_WORKER:
+        migration_tasks.DeployDiskCopyWorker,
+    constants.TASK_TYPE_COPY_DISK_DATA:
+        migration_tasks.CopyDiskData,
+    constants.TASK_TYPE_DELETE_DISK_COPY_WORKER:
+        migration_tasks.DeleteDiskCopyWorker,
     constants.TASK_TYPE_CLEANUP_FAILED_IMPORT_INSTANCE:
         migration_tasks.CleanupFailedImportInstanceTask,
     constants.TASK_TYPE_DEPLOY_OS_MORPHING_RESOURCES:

+ 74 - 0
coriolis/tasks/migration_tasks.py

@@ -4,6 +4,8 @@
 from coriolis import constants
 from coriolis.providers import factory as providers_factory
 from coriolis import schemas
+from coriolis import exception
+from coriolis.migrations import manager
 from coriolis.tasks import base
 
 from oslo_log import log as logging
@@ -49,6 +51,78 @@ class ImportInstanceTask(base.TaskRunner):
 
         task_info["origin_provider_type"] = constants.PROVIDER_TYPE_EXPORT
         task_info["destination_provider_type"] = constants.PROVIDER_TYPE_IMPORT
+        # We need to retain export info until after disk sync
+        # TODO: remove this when we implement multi-worker, and by extension
+        #       some external storage for needed resources (like swift)
+        task_info["retain_export_path"] = True
+
+        return task_info
+
+
+class DeployDiskCopyWorker(base.TaskRunner):
+    def run(self, ctxt, instance, origin, destination, task_info,
+            event_handler):
+        provider = providers_factory.get_provider(
+            destination["type"], constants.PROVIDER_TYPE_IMPORT, event_handler)
+        connection_info = base.get_connection_info(ctxt, destination)
+        target_environment = destination.get("target_environment") or {}
+        instance_deployment_info = task_info["instance_deployment_info"]
+
+        resources_info = provider.deploy_disk_copy_worker(
+            ctxt, connection_info, target_environment,
+            instance_deployment_info)
+
+        conn_info = resources_info[
+            "instance_deployment_info"]["disk_sync_connection_info"]
+        conn_info = base.marshal_migr_conn_info(conn_info)
+        task_info["instance_deployment_info"] = resources_info[
+            "instance_deployment_info"]
+        task_info["instance_deployment_info"][
+            "disk_sync_connection_info"] = conn_info
+        # We need to retain export info until after disk sync
+        # TODO: remove this when we implement multi-worker, and by extension
+        #       some external storage for needed resources (like swift)
+        task_info["retain_export_path"] = True
+
+        return task_info
+
+
+class CopyDiskData(base.TaskRunner):
+    def run(self, ctxt, instance, origin, destination, task_info,
+            event_handler):
+        instance_deployment_info = task_info["instance_deployment_info"]
+        volumes_info = instance_deployment_info["volumes_info"]
+        LOG.info("Volumes info is: %r" % volumes_info)
+
+        image_paths = [i.get("disk_image_uri") for i in volumes_info]
+        if None in image_paths:
+            raise exception.InvalidActionTasksExecutionState(
+                "disk_image_uri must be part of volumes_info for"
+                " standard migrations")
+
+        target_conn_info = base.unmarshal_migr_conn_info(
+            instance_deployment_info["disk_sync_connection_info"])
+        manager.copy_disk_data(
+            target_conn_info, volumes_info, event_handler)
+
+        return task_info
+
+
+class DeleteDiskCopyWorker(base.TaskRunner):
+    def run(self, ctxt, instance, origin, destination, task_info,
+            event_handler):
+        provider = providers_factory.get_provider(
+            destination["type"], constants.PROVIDER_TYPE_IMPORT, event_handler)
+        connection_info = base.get_connection_info(ctxt, destination)
+        instance_deployment_info = task_info.get(
+            "instance_deployment_info", {})
+        provider.delete_disk_copy_worker(
+            ctxt, connection_info, instance_deployment_info)
+
+        if instance_deployment_info.get("disk_sync_connection_info"):
+            del instance_deployment_info["disk_sync_connection_info"]
+        if instance_deployment_info.get("disk_sync_tgt_resources"):
+            del instance_deployment_info["disk_sync_tgt_resources"]
 
         return task_info