Просмотр исходного кода

Adds replica support in VMware and OpenStack providers

Alessandro Pilotti 9 лет назад
Родитель
Сommit
8b82797f05

+ 51 - 1
coriolis/providers/base.py

@@ -51,7 +51,6 @@ class BaseImportProvider(BaseProvider):
 
         return True
 
-
     @abc.abstractmethod
     def import_instance(self, ctxt, connection_info, target_environment,
                         instance_name, export_info):
@@ -62,6 +61,40 @@ class BaseImportProvider(BaseProvider):
         pass
 
 
+class BaseReplicaImportProvider(BaseImportProvider):
+    __metaclass__ = abc.ABCMeta
+
+    @abc.abstractmethod
+    def deploy_replica_instance(self, ctxt, connection_info,
+                                target_environment, instance_name, export_info,
+                                volumes_info):
+        pass
+
+    @abc.abstractmethod
+    def deploy_replica_disks(self, ctxt, connection_info, target_environment,
+                             instance_name, export_info):
+        pass
+
+    @abc.abstractmethod
+    def update_replica_disks(self, ctxt, connection_info, target_environment,
+                             instance_name, export_info, volumes_info):
+        pass
+
+    @abc.abstractmethod
+    def deploy_replica_resources(self, ctxt, connection_info,
+                                 target_environment, volumes_info):
+        pass
+
+    @abc.abstractmethod
+    def delete_replica_resources(self, ctxt, connection_info,
+                                 target_replica_info):
+        pass
+
+    @abc.abstractmethod
+    def delete_replica_disks(self, ctxt, connection_info, volumes_info):
+        pass
+
+
 class BaseExportProvider(BaseProvider):
     __metaclass__ = abc.ABCMeta
 
@@ -72,3 +105,20 @@ class BaseExportProvider(BaseProvider):
         to the provided export directory path using the given connection info.
         """
         pass
+
+
+class BaseReplicaExportProvider(BaseExportProvider):
+    __metaclass__ = abc.ABCMeta
+
+    @abc.abstractmethod
+    def get_replica_instance_info(self, ctxt, connection_info, instance_name):
+        pass
+
+    @abc.abstractmethod
+    def replicate_disks(self, ctxt, connection_info, instance_name,
+                        target_conn_info, volumes_info, incremental):
+        pass
+
+    @abc.abstractmethod
+    def shutdown_instance(self, ctxt, connection_info, instance_name):
+        pass

+ 329 - 100
coriolis/providers/openstack/__init__.py

@@ -87,6 +87,9 @@ MIGR_USER_DATA = (
 MIGR_GUEST_USERNAME = 'cloudbase'
 MIGR_GUEST_USERNAME_WINDOWS = "admin"
 
+VOLUME_NAME_FORMAT = "%(instance_name)s %(num)s"
+REPLICA_VOLUME_NAME_FORMAT = "Coriolis Replica - %(instance_name)s %(num)s"
+
 LOG = logging.getLogger(__name__)
 
 
@@ -121,16 +124,32 @@ def _wait_for_instance(nova, instance, expected_status='ACTIVE'):
 
 
 @utils.retry_on_error()
-def _wait_for_volume(nova, volume, expected_status='in-use'):
-    volume = nova.volumes.findall(id=volume.id)[0]
+def _create_volume(cinder, size, name, image_ref=None):
+    volume_size_gb = math.ceil(size / units.Gi)
+    return cinder.volumes.create(
+        size=volume_size_gb,
+        name=name,
+        imageRef=image_ref)
+
+
+@utils.retry_on_error()
+def _wait_for_volume(cinder, volume, expected_status='in-use'):
+    volume = cinder.volumes.findall(id=volume.id)[0]
     while volume.status not in [expected_status, 'error']:
         time.sleep(2)
-        volume = nova.volumes.get(volume.id)
+        volume = cinder.volumes.get(volume.id)
     if volume.status != expected_status:
         raise exception.CoriolisException(
             "Volume is in status: %s" % volume.status)
 
 
+@utils.retry_on_error()
+def _delete_volume(cinder, volume_id):
+    volumes = cinder.volumes.findall(id=volume_id)
+    for volume in volumes:
+        volume.delete()
+
+
 class _MigrationResources(object):
     def __init__(self, nova, neutron, keypair, instance, port,
                  floating_ip, guest_port, sec_group, username, password, k):
@@ -146,6 +165,53 @@ class _MigrationResources(object):
         self._username = username
         self._password = password
 
+    def get_resources_dict(self):
+        return {
+            "instance_id": self._instance.id,
+            "keypair_name": self._keypair.name,
+            "port_id": self._port["id"],
+            "floating_ip_id": self._floating_ip.id,
+            "secgroup_id": self._sec_group.id,
+        }
+
+    @classmethod
+    @utils.retry_on_error()
+    def from_resources_dict(cls, nova, neutron, resources_dict):
+        instance_id = resources_dict["instance_id"]
+        keypair_name = resources_dict["keypair_name"]
+        floating_ip_id = resources_dict["floating_ip_id"]
+        secgroup_id = resources_dict["secgroup_id"]
+        port_id = resources_dict["port_id"]
+
+        instance = None
+        instances = nova.servers.findall(id=instance_id)
+        if instances:
+            instance = instances[0]
+
+        keypair = None
+        keypairs = nova.keypairs.findall(name=keypair_name)
+        if keypairs:
+            keypair = keypairs[0]
+
+        floating_ip = None
+        floating_ips = nova.floating_ips.findall(id=floating_ip_id)
+        if floating_ips:
+            floating_ip = floating_ips[0]
+
+        sec_group = None
+        sec_groups = nova.security_groups.findall(id=secgroup_id)
+        if sec_groups:
+            sec_group = sec_groups[0]
+
+        port = None
+        ports = neutron.list_ports(id=port_id)["ports"]
+        if ports:
+            port = ports[0]
+
+        return cls(
+            nova, neutron, keypair, instance, port, floating_ip, None,
+            sec_group, None, None, None)
+
     def get_guest_connection_info(self):
         return {
             "ip": self._floating_ip.ip,
@@ -188,7 +254,7 @@ class _MigrationResources(object):
             self._keypair = None
 
 
-class ImportProvider(base.BaseImportProvider):
+class ImportProvider(base.BaseReplicaImportProvider):
 
     connection_info_schema = schemas.get_schema(
         __name__, schemas.PROVIDER_CONNECTION_INFO_SCHEMA_NAME)
@@ -276,7 +342,7 @@ class ImportProvider(base.BaseImportProvider):
             os.close(fd)
             os.remove(key_path)
 
-    @utils.retry_on_error()
+    @utils.retry_on_error(max_attempts=10, sleep_seconds=30)
     def _deploy_migration_resources(self, nova, glance, neutron,
                                     os_type, migr_image_name, migr_flavor_name,
                                     migr_network_name, migr_fip_pool_name):
@@ -381,25 +447,29 @@ class ImportProvider(base.BaseImportProvider):
             raise
 
     @utils.retry_on_error()
-    def _attach_volume(self, nova, instance, volume, volume_dev=None):
-        nova.volumes.create_server_volume(
-            instance.id, volume.id, volume_dev)
-        _wait_for_volume(nova, volume, 'in-use')
+    def _attach_volume(self, nova, cinder, instance, volume_id,
+                       volume_dev=None):
+        # volume can be either a Volume object or an id
+        volume = nova.volumes.create_server_volume(
+            instance.id, volume_id, volume_dev)
+        _wait_for_volume(cinder, volume, 'in-use')
+        return volume
 
     def _get_import_config(self, target_environment, os_type):
         config = collections.namedtuple(
-            "glance_upload",
-            "target_disk_format",
-            "container_format",
-            "hypervisor_type",
-            "fip_pool_name",
-            "network_map",
-            "keypair_name",
-            "migr_image_name",
-            "migr_flavor_name",
-            "migr_fip_pool_name",
-            "migr_network_name",
-            "flavor_name")
+            "ImportConfig",
+            ["glance_upload",
+             "target_disk_format",
+             "container_format",
+             "hypervisor_type",
+             "fip_pool_name",
+             "network_map",
+             "keypair_name",
+             "migr_image_name",
+             "migr_flavor_name",
+             "migr_fip_pool_name",
+             "migr_network_name",
+             "flavor_name"])
 
         config.glance_upload = target_environment.get(
             "glance_upload", CONF.openstack_migration_provider.glance_upload)
@@ -450,8 +520,99 @@ class ImportProvider(base.BaseImportProvider):
 
         return config
 
-    def import_instance(self, ctxt, connection_info, target_environment,
-                        instance_name, export_info):
+    def _create_images_and_volumes(self, glance, nova, config, disks_info):
+        if not config.glance_upload:
+            raise exception.CoriolisException(
+                "Glance upload is currently required for migrations")
+
+        images = []
+        volumes = []
+
+        for disk_info in disks_info:
+            disk_path = disk_info["path"]
+            disk_file_info = utils.get_disk_info(disk_path)
+
+            # if config.target_disk_format == disk_file_info["format"]:
+            #    target_disk_path = disk_path
+            # else:
+            #    target_disk_path = (
+            #        "%s.%s" % (os.path.splitext(disk_path)[0],
+            #                   config.target_disk_format))
+            #    utils.convert_disk_format(disk_path, target_disk_path,
+            #                              config.target_disk_format)
+
+            self._event_manager.progress_update(
+                "Uploading Glance image")
+
+            disk_format = disk_file_info["format"]
+            image = self._create_image(
+                glance, _get_unique_name(),
+                disk_path, disk_format,
+                config.container_format,
+                config.hypervisor_type)
+            images.append(image)
+
+            self._event_manager.progress_update(
+                "Waiting for Glance image to become active")
+            _wait_for_image(nova, image.id)
+
+            virtual_disk_size = disk_file_info["virtual-size"]
+            if disk_format != constants.DISK_FORMAT_RAW:
+                virtual_disk_size += DISK_HEADER_SIZE
+
+            self._event_manager.progress_update(
+                "Creating Cinder volume")
+
+            volume = _create_volume(
+                cinder, virtual_disk_size, _get_unique_name(), image.id)
+            volumes.append(volume)
+
+        return images, volumes
+
+    def _create_neutron_ports(self, neutron, config, nics_info):
+        ports = []
+
+        for nic_info in nics_info:
+            origin_network_name = nic_info.get("network_name")
+            if not origin_network_name:
+                self._warn("Origin network name not provided for for nic: "
+                           "%s, skipping", nic_info.get("name"))
+                continue
+
+            network_name = config.network_map.get(origin_network_name)
+            if not network_name:
+                raise exception.CoriolisException(
+                    "Network not mapped in network_map: %s" %
+                    origin_network_name)
+
+            ports.append(self._create_neutron_port(
+                neutron, network_name, nic_info.get("mac_address")))
+
+        return ports
+
+    @utils.retry_on_error()
+    def _get_replica_volumes(self, cinder, volumes_info):
+        volumes = []
+        for volume_id in [v["volume_id"] for v in volumes_info]:
+            volumes.append(cinder.volumes.get(volume_id))
+        return volumes
+
+    @utils.retry_on_error()
+    def _rename_volumes(self, cinder, volumes, instance_name):
+        for i, volume in enumerate(volumes):
+            new_volume_name = VOLUME_NAME_FORMAT % {
+                "instance_name": instance_name, "num": i + 1}
+            cinder.volumes.update(volume.id, name=new_volume_name)
+
+    @utils.retry_on_error()
+    def _set_bootable_volumes(self, cinder, volumes):
+        # TODO: check if just setting the first volume as bootable is enough
+        for volume in volumes:
+            if not volume.bootable or volume.bootable == 'false':
+                cinder.volumes.set_bootable(volume, True)
+
+    def _deploy_instance(self, ctxt, connection_info, target_environment,
+                         instance_name, export_info, volumes_info=None):
         session = keystone.create_keystone_session(ctxt, connection_info)
 
         glance_api_version = connection_info.get("image_api_version",
@@ -467,55 +628,19 @@ class ImportProvider(base.BaseImportProvider):
 
         config = self._get_import_config(target_environment, os_type)
 
-        disks_info = export_info["devices"]["disks"]
-
         images = []
         volumes = []
         ports = []
 
         try:
-            if config.glance_upload:
-                for disk_info in disks_info:
-                    disk_path = disk_info["path"]
-                    disk_file_info = utils.get_disk_info(disk_path)
-
-                    # if config.target_disk_format == disk_file_info["format"]:
-                    #    target_disk_path = disk_path
-                    # else:
-                    #    target_disk_path = (
-                    #        "%s.%s" % (os.path.splitext(disk_path)[0],
-                    #                   config.target_disk_format))
-                    #    utils.convert_disk_format(disk_path, target_disk_path,
-                    #                              config.target_disk_format)
-
-                    self._event_manager.progress_update(
-                        "Uploading Glance image")
-
-                    disk_format = disk_file_info["format"]
-                    image = self._create_image(
-                        glance, _get_unique_name(),
-                        disk_path, disk_format,
-                        config.container_format,
-                        config.hypervisor_type)
-                    images.append(image)
-
-                    self._event_manager.progress_update(
-                        "Waiting for Glance image to become active")
-                    _wait_for_image(nova, image.id)
-
-                    virtual_disk_size = disk_file_info["virtual-size"]
-                    if disk_format != constants.DISK_FORMAT_RAW:
-                        virtual_disk_size += DISK_HEADER_SIZE
-
-                    self._event_manager.progress_update(
-                        "Creating Cinder volume")
-
-                    volume_size_gb = math.ceil(virtual_disk_size / units.Gi)
-                    volume = nova.volumes.create(
-                        size=volume_size_gb,
-                        display_name=_get_unique_name(),
-                        imageRef=image.id)
-                    volumes.append(volume)
+            if not volumes_info:
+                # Migration
+                disks_info = export_info["devices"]["disks"]
+                images, volumes = self._create_images_and_volumes(
+                    glance, nova, config, disks_info)
+            else:
+                # Replica
+                volumes = self._get_replica_volumes(cinder, volumes_info)
 
             migr_resources = self._deploy_migration_resources(
                 nova, glance, neutron, os_type, config.migr_image_name,
@@ -526,13 +651,13 @@ class ImportProvider(base.BaseImportProvider):
 
             try:
                 for i, volume in enumerate(volumes):
-                    _wait_for_volume(nova, volume, 'available')
+                    _wait_for_volume(cinder, volume, 'available')
 
                     self._event_manager.progress_update(
                         "Attaching volume to worker instance")
 
-                    self._attach_volume(nova, migr_resources.get_instance(),
-                                        volume)
+                    self._attach_volume(
+                        nova, cinder, migr_resources.get_instance(), volume.id)
 
                     conn_info = migr_resources.get_guest_connection_info()
 
@@ -553,44 +678,31 @@ class ImportProvider(base.BaseImportProvider):
                 migr_resources.delete()
 
             self._event_manager.progress_update("Renaming volumes")
+            self._rename_volumes(cinder, volumes, instance_name)
 
-            for i, volume in enumerate(volumes):
-                new_volume_name = "%s %s" % (instance_name, i + 1)
-                cinder.volumes.update(volume.id, name=new_volume_name)
-
-            for nic_info in nics_info:
-                self._event_manager.progress_update(
-                    "Creating Neutron port for migrated instance")
-
-                origin_network_name = nic_info.get("network_name")
-                if not origin_network_name:
-                    self._warn("Origin network name not provided for for nic: "
-                               "%s, skipping", nic_info.get("name"))
-                    continue
-
-                network_name = config.network_map.get(origin_network_name)
-                if not network_name:
-                    raise exception.CoriolisException(
-                        "Network not mapped in network_map: %s" %
-                        origin_network_name)
+            self._event_manager.progress_update(
+                "Ensuring volumes are bootable")
+            self._set_bootable_volumes(cinder, volumes)
 
-                ports.append(self._create_neutron_port(
-                    neutron, network_name, nic_info.get("mac_address")))
+            self._event_manager.progress_update(
+                "Creating Neutron ports for migrated instance")
+            ports = self._create_neutron_ports(neutron, config, nics_info)
 
             self._event_manager.progress_update(
                 "Creating migrated instance")
-
             self._create_target_instance(
                 nova, config.flavor_name, instance_name,
                 config.keypair_name, ports, volumes)
         except Exception:
-            self._event_manager.progress_update("Deleting volumes")
-            for volume in volumes:
-                @utils.ignore_exceptions
-                @utils.retry_on_error()
-                def _del_volume():
-                    volume.delete()
-                _del_volume()
+            if not volumes_info:
+                # Don't remove replica volumes
+                self._event_manager.progress_update("Deleting volumes")
+                for volume in volumes:
+                    @utils.ignore_exceptions
+                    @utils.retry_on_error()
+                    def _del_volume():
+                        volume.delete()
+                    _del_volume()
             self._event_manager.progress_update("Deleting Neutron ports")
             for port in ports:
                 @utils.ignore_exceptions
@@ -608,6 +720,11 @@ class ImportProvider(base.BaseImportProvider):
                     image.delete()
                 _del_image()
 
+    def import_instance(self, ctxt, connection_info, target_environment,
+                        instance_name, export_info):
+        self._deploy_instance(ctxt, connection_info, target_environment,
+                              instance_name, export_info)
+
     def _get_osmorphing_hypervisor_type(self, hypervisor_type):
         if (hypervisor_type and
                 hypervisor_type.lower() == constants.HYPERVISOR_QEMU):
@@ -615,7 +732,7 @@ class ImportProvider(base.BaseImportProvider):
         elif hypervisor_type:
             return hypervisor_type.lower()
 
-    @utils.retry_on_error()
+    @utils.retry_on_error(max_attempts=10, sleep_seconds=30)
     def _create_target_instance(self, nova, flavor_name, instance_name,
                                 keypair_name, ports, volumes):
         flavor = nova.flavors.find(name=flavor_name)
@@ -646,6 +763,118 @@ class ImportProvider(base.BaseImportProvider):
                 nova.servers.delete(instance)
             raise
 
+    def deploy_replica_instance(self, ctxt, connection_info,
+                                target_environment, instance_name, export_info,
+                                volumes_info):
+        self._deploy_instance(ctxt, connection_info, target_environment,
+                              instance_name, export_info, volumes_info)
+
+    def deploy_replica_disks(self, ctxt, connection_info, target_environment,
+                             instance_name, export_info):
+        session = keystone.create_keystone_session(ctxt, connection_info)
+
+        cinder = cinder_client.Client(CINDER_API_VERSION, session=session)
+
+        try:
+            disks_info = export_info["devices"]["disks"]
+            volumes_info = []
+            volumes = []
+            for i, disk_info in enumerate(disks_info):
+                self._event_manager.progress_update(
+                    "Creating volume")
+
+                disk_id = disk_info["id"]
+                virtual_disk_size = disk_info["size_bytes"]
+                volume_name = REPLICA_VOLUME_NAME_FORMAT % {
+                    "instance_name": instance_name, "num": i + 1}
+                volume = _create_volume(
+                    cinder, virtual_disk_size, volume_name)
+                volumes.append(volume)
+                volumes_info.append({
+                    "volume_id": volume.id,
+                    "disk_id": disk_id})
+
+            for volume in volumes:
+                _wait_for_volume(cinder, volume, 'available')
+
+            return volumes_info
+        except:
+            for volume in volumes:
+                _delete_volume(cinder, volume)
+            raise
+
+    def update_replica_disks(self, ctxt, connection_info, target_environment,
+                             instance_name, export_info, volumes_info):
+        # TODO: check if source disk size / number changed and update
+        # accordingly on destination
+        return volumes_info
+
+    def deploy_replica_resources(self, ctxt, connection_info,
+                                 target_environment, volumes_info):
+        session = keystone.create_keystone_session(ctxt, connection_info)
+
+        glance_api_version = connection_info.get("image_api_version",
+                                                 GLANCE_API_VERSION)
+        nova = nova_client.Client(NOVA_API_VERSION, session=session)
+        glance = glance_client.Client(glance_api_version, session=session)
+        neutron = neutron_client.Client(NEUTRON_API_VERSION, session=session)
+        cinder = cinder_client.Client(CINDER_API_VERSION, session=session)
+
+        # Data migration uses a Linux guest binary
+        os_type = constants.OS_TYPE_LINUX
+
+        config = self._get_import_config(target_environment, os_type)
+
+        migr_resources = self._deploy_migration_resources(
+            nova, glance, neutron, os_type, config.migr_image_name,
+            config.migr_flavor_name, config.migr_network_name,
+            config.migr_fip_pool_name)
+
+        try:
+            for i, volume_info in enumerate(volumes_info):
+                self._event_manager.progress_update(
+                    "Attaching volume to worker instance")
+
+                volume_id = volume_info["volume_id"]
+                ret_volume = self._attach_volume(
+                    nova, cinder, migr_resources.get_instance(), volume_id)
+                volume_info["volume_dev"] = ret_volume.device
+
+            return {
+                "migr_resources": migr_resources.get_resources_dict(),
+                "volumes": volumes_info,
+                "connection_info": migr_resources.get_guest_connection_info(),
+            }
+        except:
+            self._event_manager.progress_update(
+                "Removing worker instance resources")
+            migr_resources.delete()
+            raise
+
+    def delete_replica_resources(self, ctxt, connection_info,
+                                 target_replica_info):
+        session = keystone.create_keystone_session(ctxt, connection_info)
+
+        nova = nova_client.Client(NOVA_API_VERSION, session=session)
+        neutron = neutron_client.Client(NEUTRON_API_VERSION, session=session)
+
+        migr_resources_dict = target_replica_info["migr_resources"]
+        migr_resources = _MigrationResources.from_resources_dict(
+            nova, neutron, migr_resources_dict)
+        self._event_manager.progress_update(
+            "Removing worker instance resources")
+        migr_resources.delete()
+
+    def delete_replica_disks(self, ctxt, connection_info, volumes_info):
+        session = keystone.create_keystone_session(ctxt, connection_info)
+
+        cinder = cinder_client.Client(CINDER_API_VERSION, session=session)
+
+        self._event_manager.progress_update(
+            "Removing replica disk volumes")
+        for volume_info in volumes_info:
+            _delete_volume(cinder, volume_info["volume_id"])
+
 
 class ExportProvider(base.BaseExportProvider):
     _OS_DISTRO_MAP = {

+ 240 - 40
coriolis/providers/vmware_vsphere/__init__.py

@@ -1,9 +1,11 @@
 # Copyright 2016 Cloudbase Solutions Srl
 # All Rights Reserved.
 
+import abc
 import contextlib
 import os
 import re
+import struct
 import sys
 import time
 from urllib import request
@@ -12,6 +14,7 @@ import uuid
 import eventlet
 from oslo_config import cfg
 from oslo_log import log as logging
+import paramiko
 from pyVim import connect
 from pyVmomi import vim
 
@@ -35,7 +38,144 @@ CONF.register_opts(vmware_vsphere_opts, 'vmware_vsphere')
 LOG = logging.getLogger(__name__)
 
 
-class ExportProvider(base.BaseExportProvider):
+class _BaseBackupWriter(metaclass=abc.ABCMeta):
+    @abc.abstractmethod
+    def _open(self):
+        pass
+
+    @contextlib.contextmanager
+    def open(self, path, disk_id):
+        self._path = path
+        self._disk_id = disk_id
+        self._open()
+        try:
+            yield self
+        finally:
+            self.close()
+
+    @abc.abstractmethod
+    def seek(self, pos):
+        pass
+
+    @abc.abstractmethod
+    def truncate(self, size):
+        pass
+
+    @abc.abstractmethod
+    def write(self, data):
+        pass
+
+    @abc.abstractmethod
+    def close(self):
+        pass
+
+
+class _FileBackupWriter(_BaseBackupWriter):
+    def _open(self):
+        # Create file if it doesnt exist
+        open(self._path, 'ab+').close()
+        self._file = open(self._path, 'rb+')
+
+    def seek(self, pos):
+        self._file.seek(pos)
+
+    def truncate(self, size):
+        self._file.truncate(size)
+
+    def write(self, data):
+        self._file.write(data)
+
+    def close(self):
+        self._file.close()
+
+
+class _SSHBackupWriter(_BaseBackupWriter):
+    def __init__(self, ip, port, username, pkey, password, volumes_info):
+        self._ip = ip
+        self._port = port
+        self._username = username
+        self._pkey = pkey
+        self._password = password
+        self._volumes_info = volumes_info
+
+    @utils.retry_on_error()
+    def _connect_ssh(self):
+        LOG.info("Connecting to SSH host: %(ip)s:%(port)s" %
+                 {"ip": self._ip, "port": self._port})
+        self._ssh = paramiko.SSHClient()
+        self._ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
+        self._ssh.connect(
+            hostname=self._ip,
+            port=self._port,
+            username=self._username,
+            pkey=self._pkey,
+            password=self._password)
+
+    @utils.retry_on_error()
+    def _copy_helper_cmd(self):
+        sftp = self._ssh.open_sftp()
+        local_path = os.path.join(
+            utils.get_resources_dir(), 'write_data')
+        sftp.put(local_path, 'write_data')
+        sftp.close()
+
+    @utils.retry_on_error()
+    def _exec_helper_cmd(self):
+        self._msg_id = 0
+        self._offset = 0
+        self._stdin, self._stdout, self._stderr = self._ssh.exec_command(
+            "chmod +x write_data && sudo ./write_data")
+
+    def _encode_data(self, content):
+        path = [v for v in self._volumes_info
+                if v["disk_id"] == self._disk_id][0]["volume_dev"]
+
+        LOG.info("Guest path: %s", path)
+        LOG.info("Offset: %s", self._offset)
+        LOG.info("Content len: %s", len(content))
+
+        data_len = len(path) + 1 + 8 + len(content)
+        return (struct.pack("<I", self._msg_id) +
+                struct.pack("<I", data_len) +
+                path.encode() + b'\0' +
+                struct.pack("<Q", self._offset) +
+                content)
+
+    def _encode_eod(self):
+        return struct.pack("<I", self._msg_id) + struct.pack("<I", 0)
+
+    @utils.retry_on_error()
+    def _send_msg(self, data):
+        self._msg_id += 1
+        self._stdin.write(data)
+        self._stdin.flush()
+        out_msg_id = self._stdout.read(4)
+
+    def _open(self):
+        self._connect_ssh()
+        self._copy_helper_cmd()
+        self._exec_helper_cmd()
+
+    def seek(self, pos):
+        self._offset = pos
+
+    def truncate(self, size):
+        pass
+
+    def write(self, data):
+        self._send_msg(self._encode_data(data))
+        self._offset += len(data)
+
+    def close(self):
+        self._send_msg(self._encode_eod())
+        ret_val = self._stdout.channel.recv_exit_status()
+        if ret_val:
+            raise exception.CoriolisException(
+                "An exception occurred while writing data on target")
+        self._ssh.close()
+
+
+class ExportProvider(base.BaseReplicaExportProvider):
 
     connection_info_schema = schemas.get_schema(
         __name__, schemas.PROVIDER_CONNECTION_INFO_SCHEMA_NAME)
@@ -54,14 +194,33 @@ class ExportProvider(base.BaseExportProvider):
             raise exception.CoriolisException(task.info.error.msg)
 
     @utils.retry_on_error()
-    def _connect(self, host, username, password, port, context):
+    @contextlib.contextmanager
+    def _connect(self, connection_info):
+        host = connection_info["host"]
+        port = connection_info.get("port", 443)
+        username = connection_info["username"]
+        password = connection_info["password"]
+        allow_untrusted = connection_info.get("allow_untrusted", False)
+
+        # pyVmomi locks otherwise
+        sys.modules['socket'] = eventlet.patcher.original('socket')
+        ssl = eventlet.patcher.original('ssl')
+
+        context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
+        if allow_untrusted:
+            context.verify_mode = ssl.CERT_NONE
+
         LOG.info("Connecting to: %s:%s" % (host, port))
-        return connect.SmartConnect(
+        si = connect.SmartConnect(
             host=host,
             user=username,
             pwd=password,
             port=port,
             sslContext=context)
+        try:
+            yield context, si
+        finally:
+            connect.Disconnect(si)
 
     def _wait_for_vm_status(self, vm, status, max_wait=120):
         i = 0
@@ -337,7 +496,8 @@ class ExportProvider(base.BaseExportProvider):
 
     @utils.retry_on_error()
     def _backup_snapshot_disks(self, snapshot, export_path, connection_info,
-                               context, disk_paths):
+                               context, disk_paths, backup_writer,
+                               incremental):
         vm = snapshot.vm
         vmx_spec = "moref=%s" % vm._GetMoId()
         snapshot_ref = snapshot._GetMoId()
@@ -349,34 +509,31 @@ class ExportProvider(base.BaseExportProvider):
                                       vmx_spec, snapshot_ref) as conn:
             for disk in [d for d in snapshot.config.hardware.device
                          if isinstance(d, vim.vm.device.VirtualDisk)]:
+                change_id = '*'
 
                 l = [d for d in disk_paths if d['id'] == disk.key]
                 if l:
                     disk_path = l[0]
-                    change_id = disk_path["change_id"]
+                    if incremental:
+                        change_id = disk_path["change_id"]
                     path = disk_path["path"]
-                    disk_path['change_id'] = disk.backing.changeId
                 else:
-                    change_id = '*'
                     path = os.path.join(export_path, "disk-%s.raw" % disk.key)
-                    disk_paths.append({
+                    disk_path = {
                         'path': path,
                         'id': disk.key,
-                        'format': constants.DISK_FORMAT_RAW,
-                        'change_id': disk.backing.changeId})
+                        'format': constants.DISK_FORMAT_RAW}
+                    disk_paths.append(disk_path)
 
                 LOG.debug("CBT change id: %s", change_id)
                 changed_disk_areas = vm.QueryChangedDiskAreas(
-                    snapshot_ref, disk.key, pos, change_id)
+                    snapshot, disk.key, pos, change_id)
 
                 backup_disk_path = disk.backing.fileName
                 with vixdisklib.open(
                         conn, backup_disk_path) as disk_handle:
 
-                    # Create file if it doesn't exist
-                    open(path, "ab").close()
-
-                    with open(path, "rb+") as f:
+                    with backup_writer.open(path, disk.key) as f:
                         # Create a sparse file
                         f.truncate(disk.capacityInBytes)
 
@@ -401,6 +558,8 @@ class ExportProvider(base.BaseExportProvider):
 
                                 f.write(buf.raw)
 
+                disk_path['change_id'] = disk.backing.changeId
+
     def _backup_disks(self, vm, export_path, connection_info, context):
         if not vm.config.changeTrackingEnabled:
             raise exception.CoriolisException("Change Tracking not enabled")
@@ -413,7 +572,7 @@ class ExportProvider(base.BaseExportProvider):
             with self._take_temp_vm_snapshot(vm, snapshot_name) as snapshot:
                 self._backup_snapshot_disks(
                     snapshot, export_path, connection_info, context,
-                    disk_paths)
+                    disk_paths, _FileBackupWriter(), incremental=False)
 
             self._shutdown_vm(vm)
 
@@ -422,7 +581,7 @@ class ExportProvider(base.BaseExportProvider):
             with self._take_temp_vm_snapshot(vm, snapshot_name) as snapshot:
                 self._backup_snapshot_disks(
                     snapshot, export_path, connection_info, context,
-                    disk_paths)
+                    disk_paths, _FileBackupWriter(), incremental=True)
 
             return disk_paths
         finally:
@@ -430,42 +589,21 @@ class ExportProvider(base.BaseExportProvider):
 
     def export_instance(self, ctxt, connection_info, instance_name,
                         export_path):
-        host = connection_info["host"]
-        port = connection_info.get("port", 443)
-        username = connection_info["username"]
-        password = connection_info["password"]
-        allow_untrusted = connection_info.get("allow_untrusted", False)
-
-        backup_disks = False
-
-        # pyVmomi locks otherwise
-        sys.modules['socket'] = eventlet.patcher.original('socket')
-        ssl = eventlet.patcher.original('ssl')
-
-        context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
-        if allow_untrusted:
-            context.verify_mode = ssl.CERT_NONE
-
-        self._event_manager.set_total_progress_steps(4)
-
         self._event_manager.progress_update("Connecting to vSphere host")
-        si = self._connect(host, username, password, port, context)
-        try:
+        with self._connect(connection_info) as (context, si):
             self._event_manager.progress_update(
                 "Retrieving virtual machine data")
             vm_info, vm = self._get_vm_info(si, instance_name)
-            self._event_manager.progress_update("Exporting disks")
 
             # Take advantage of CBT if available
             backup_disks = vm.config.changeTrackingEnabled
 
+            self._event_manager.progress_update("Exporting disks")
             if backup_disks:
                 disk_paths = self._backup_disks(
                     vm, export_path, connection_info, context)
             else:
                 disk_paths = self._export_disks(vm, export_path, context)
-        finally:
-            connect.Disconnect(si)
 
         if not backup_disks:
             self._event_manager.progress_update(
@@ -486,3 +624,65 @@ class ExportProvider(base.BaseExportProvider):
             disk_info["path"] = os.path.abspath(disk_path["path"])
 
         return vm_info
+
+    def get_replica_instance_info(self, ctxt, connection_info, instance_name):
+        self._event_manager.progress_update("Connecting to vSphere host")
+        with self._connect(connection_info) as (context, si):
+            self._event_manager.progress_update(
+                "Retrieving virtual machine data")
+            vm_info, vm = self._get_vm_info(si, instance_name)
+
+            if not vm.config.changeTrackingEnabled:
+                raise exception.CoriolisException(
+                    "Changed Block Tracking must be enabled in order to "
+                    "replicate a VM")
+
+        return vm_info
+
+    def shutdown_instance(self, ctxt, connection_info, instance_name):
+        self._event_manager.progress_update("Connecting to vSphere host")
+        with self._connect(connection_info) as (context, si):
+            vm = self._get_vm(si, instance_name)
+            self._shutdown_vm(vm)
+
+    def replicate_disks(self, ctxt, connection_info, instance_name,
+                        target_conn_info, volumes_info, incremental):
+        ip = target_conn_info["ip"]
+        port = target_conn_info.get("port", 22)
+        username = target_conn_info["username"]
+        pkey = target_conn_info.get("pkey")
+        password = target_conn_info.get("password")
+
+        LOG.info("Waiting for connectivity on host: %(ip)s:%(port)s",
+                 {"ip": ip, "port": port})
+        utils.wait_for_port_connectivity(ip, port)
+
+        with self._connect(connection_info) as (context, si):
+            vm = self._get_vm(si, instance_name)
+
+            backup_writer = _SSHBackupWriter(
+                ip, port, username, pkey, password, volumes_info)
+
+            snapshot_name = str(uuid.uuid4())
+            disk_paths = []
+            for volume_info in volumes_info:
+                disk_paths.append(
+                    {"id": volume_info["disk_id"],
+                     "change_id": volume_info.get("change_id", "*"),
+                     "path": ""})
+
+            with self._take_temp_vm_snapshot(vm, snapshot_name) as snapshot:
+                vixdisklib.init()
+                try:
+                    self._backup_snapshot_disks(
+                        snapshot, "", connection_info, context,
+                        disk_paths, backup_writer, incremental)
+                finally:
+                    vixdisklib.exit()
+
+            for volume_info in volumes_info:
+                change_id = [d["change_id"] for d in disk_paths
+                             if d["id"] == volume_info["disk_id"]][0]
+                volume_info["change_id"] = change_id
+
+        return volumes_info

+ 9 - 0
coriolis/utils.py

@@ -3,6 +3,7 @@
 
 import functools
 import json
+import os
 import re
 import socket
 import subprocess
@@ -236,3 +237,11 @@ def get_ssl_cert_thumbprint(context, host, port=443, digest_algorithm="sha1"):
 
     x509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_ASN1, cert)
     return x509.digest('sha1').decode()
+
+
+def _get_base_dir():
+    return os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
+
+
+def get_resources_dir():
+    return os.path.join(_get_base_dir(), "resources")

+ 127 - 0
resources/write_data.c

@@ -0,0 +1,127 @@
+// Copyright 2016 Cloudbase Solutions Srl
+// All Rights Reserved.
+
+#include <stdio.h>
+#include <stdint.h>
+#include <stdlib.h>
+#include <string.h>
+
+#define MIN_MSG_SIZE (sizeof(uint64_t) + 1)
+#define MAX_MSG_SIZE (100 * 1024 * 1024)
+
+#define ERR_MORE_MSG        -1
+#define ERR_DONE            0
+#define ERR_READ_MSG_SIZE   1
+#define ERR_MSG_SIZE        2
+#define ERR_OPEN_FILE       3
+#define ERR_DATA            4
+#define ERR_IO_OPEN         5
+#define ERR_IO_SEEK         6
+#define ERR_IO_WRITE        7
+#define ERR_IO_CLOSE        8
+#define ERR_NO_MEM          9
+#define ERR_INVALID_ARGS    10
+#define ERR_READ_MSG_ID     11
+
+int write_msg_id(uint32_t msg_id)
+{
+    size_t c = fwrite(&msg_id, 1, sizeof(msg_id), stdout);
+    if (c != sizeof(msg_id))
+       return ERR_IO_WRITE;
+    if(fflush(stdout))
+        return ERR_IO_WRITE;
+    return ERR_DONE;
+}
+
+int handle_msg(FILE* input_stream)
+{
+    uint32_t msg_id = 0;
+    size_t c = fread(&msg_id, 1, sizeof(uint32_t), input_stream);
+    if (c != sizeof(uint32_t))
+        return ERR_READ_MSG_ID;
+
+    uint32_t msg_size = 0;
+    c = fread(&msg_size, 1, sizeof(uint32_t), input_stream);
+    if (c != sizeof(uint32_t))
+        return ERR_READ_MSG_SIZE;
+    if (!msg_size)
+    {
+        int err = write_msg_id(msg_id);
+        if(err)
+            return err;
+        return ERR_DONE;
+    }
+    if (msg_size < MIN_MSG_SIZE || msg_size > MAX_MSG_SIZE)
+        return ERR_MSG_SIZE;
+
+    unsigned char* buf = (unsigned char*)malloc(msg_size);
+    if (!buf)
+        return ERR_NO_MEM;
+
+    c = fread(buf, 1, msg_size, input_stream);
+    if (c != msg_size)
+        return ERR_IO_OPEN;
+
+    char* path = (char*)buf;
+    // strlen is unsafe
+    unsigned char* data = (unsigned char*)memchr(path, '\0', msg_size);
+    if (!data)
+        return ERR_DATA;
+    data++;
+
+    uint64_t offset = *((uint64_t*)data);
+    data += sizeof(uint64_t);
+
+    // Create an empty file in case it does not exist
+    FILE* f = fopen(path, "ab+");
+    if (!f)
+        return ERR_OPEN_FILE;
+    if (fclose(f))
+        return ERR_IO_CLOSE;
+
+    // Use rb+ to allow fseek when writing
+    f = fopen(path, "rb+");
+    if (!f)
+        return ERR_OPEN_FILE;
+    if (fseek(f, (long)offset, SEEK_SET))
+        return ERR_IO_SEEK;
+
+    size_t data_size = msg_size - (data - buf);
+    c = fwrite(data, 1, data_size, f);
+    if (c != data_size)
+        return ERR_IO_WRITE;
+    if (fclose(f))
+        return ERR_IO_CLOSE;
+
+    // TODO: free also in case of errors
+    free(buf);
+
+    int err = write_msg_id(msg_id);
+    if(err)
+        return err;
+
+    return ERR_MORE_MSG;
+}
+
+int main(int argc, char **argv)
+{
+    FILE* input_stream = NULL;
+    if (argc == 2)
+    {
+        char* input_path = argv[1];
+        if (!(input_stream = fopen(input_path, "rb")))
+            return ERR_IO_OPEN;
+    }
+    else if (argc == 1)
+        input_stream = stdin;
+    else
+        return ERR_INVALID_ARGS;
+
+    int err;
+    while ((err = handle_msg(input_stream)) == ERR_MORE_MSG);
+
+    if (input_stream != stdin)
+        fclose(input_stream);
+
+    return err;
+}