Parcourir la source

Adds osmorphing

Alessandro Pilotti il y a 10 ans
Parent
commit
744af4e9a1

+ 8 - 0
coriolis/constants.py

@@ -17,3 +17,11 @@ DISK_FORMAT_RAW = 'raw'
 DISK_FORMAT_QCOW2 = 'qcow2'
 DISK_FORMAT_VHD = 'vhd'
 DISK_FORMAT_VHDX = 'vhdx'
+
+HYPERVISOR_VMWARE = "vmware"
+HYPERVISOR_HYPERV = "hyperv"
+HYPERVISOR_KVM = "kvm"
+HYPERVISOR_XENSERVER = "xenserver"
+
+PLATFORM_OPENSTACK = "openstack"
+PLATFORM_VMWARE_VSPHERE = "vmware_vsphere"

+ 0 - 0
coriolis/osmorphing/__init__.py


+ 10 - 0
coriolis/osmorphing/factory.py

@@ -0,0 +1,10 @@
+from coriolis.osmorphing import ubuntu
+
+
+def get_os_morphing_tools(ssh, os_root_dir):
+    os_morphing_tools_clss = [ubuntu.UbuntuOSMorphingTools]
+
+    for cls in os_morphing_tools_clss:
+        if cls.check_os(ssh, os_root_dir):
+            return cls(os_root_dir)
+    raise Exception("Cannot find the morphing tools for this OS image")

+ 41 - 0
coriolis/osmorphing/manager.py

@@ -0,0 +1,41 @@
+from oslo_log import log as logging
+import paramiko
+
+from coriolis.osmorphing import factory as osmorphing_factory
+from coriolis.osmorphing.osmount import factory as osmount_factory
+from coriolis import utils
+
+LOG = logging.getLogger(__name__)
+
+
+def morph_image(connection_info, target_hypervisor, target_platform,
+                volume_devs):
+    (ip, port, username, pkey) = connection_info
+
+    LOG.info("Waiting for connectivity on host: %s:%s", (ip, port))
+    utils.wait_for_port_connectivity(ip, port)
+
+    LOG.info("Connecting to host: %s:%s", (ip, port))
+    ssh = paramiko.SSHClient()
+    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
+    ssh.connect(hostname=ip, port=port, username=username, pkey=pkey)
+
+    os_mount_tools = osmount_factory.get_os_mount_tools(ssh)
+    os_root_dir = os_mount_tools.mount_os(ssh, volume_devs)
+    os_morphing_tools = osmorphing_factory.get_os_morphing_tools(
+        ssh, os_root_dir)
+
+    os_morphing_tools.set_dhcp(ssh)
+
+    (packages_add,
+     packages_remove) = os_morphing_tools.get_packages(target_hypervisor,
+                                                       target_platform)
+    os_morphing_tools.update_packages_list(ssh)
+
+    if packages_add:
+        LOG.info("Adding packages: %s" % str(packages_add))
+        os_morphing_tools.install_packages(ssh, packages_add)
+
+    if packages_remove:
+        LOG.info("Removing packages: %s" % str(packages_add))
+        os_morphing_tools.uninstall_packages(ssh, packages_remove)

+ 0 - 0
coriolis/osmorphing/osmount/__init__.py


+ 10 - 0
coriolis/osmorphing/osmount/factory.py

@@ -0,0 +1,10 @@
+from coriolis.osmorphing.osmount import ubuntu
+
+
+def get_os_mount_tools(ssh):
+    os_mount_tools = [ubuntu.UbuntuOSMountTools]
+
+    for cls in os_mount_tools:
+        if cls.check_os(ssh):
+            return cls()
+    raise Exception("OS mount tools not found")

+ 73 - 0
coriolis/osmorphing/osmount/ubuntu.py

@@ -0,0 +1,73 @@
+import re
+
+from coriolis import utils
+
+
+class UbuntuOSMountTools(object):
+    @staticmethod
+    def check_os(ssh):
+        os_info = utils.get_linux_os_info(ssh)
+        if os_info and os_info[0] == 'Ubuntu':
+            return True
+
+    def _get_vgnames(self, ssh):
+        vg_names = []
+        vgscan_out_lines = utils.exec_ssh_cmd(
+            ssh, "sudo vgscan").decode().split('\n')[1:-1]
+        for vgscan_out_line in vgscan_out_lines:
+            m = re.match(
+                r'\s*Found volume group "(.*)" using metadata type lvm2',
+                vgscan_out_line)
+            if m:
+                vg_names.append(m.groups()[0])
+        return vg_names
+
+    def mount_os(self, ssh, volume_devs):
+        dev_paths = []
+
+        for volume_dev in volume_devs:
+            utils.exec_ssh_cmd(
+                ssh,  "sudo partx -v -a %s || true" % volume_dev)
+            dev_paths += utils.exec_ssh_cmd(
+                ssh, "sudo ls %s*" % volume_dev).decode().split('\n')[:-1]
+
+        utils.exec_ssh_cmd(ssh, "sudo apt-get update -y")
+        utils.exec_ssh_cmd(ssh, "sudo apt-get install lvm2 -y")
+        utils.exec_ssh_cmd(ssh, "sudo modprobe dm-mod")
+
+        for vg_name in self._get_vgnames(ssh):
+            utils.exec_ssh_cmd(ssh, "sudo vgchange -ay %s" % vg_name)
+            lvm_dev_paths = utils.exec_ssh_cmd(
+                ssh, "sudo ls /dev/%s/*" % vg_name).decode().split('\n')[:-1]
+            dev_paths += lvm_dev_paths
+
+        dev_paths_to_mount = []
+        for dev_path in dev_paths:
+            fs_type = utils.exec_ssh_cmd(
+                ssh, "sudo blkid -o value -s TYPE %s || true" %
+                dev_path).decode().split('\n')[0]
+            if fs_type in ['ext2', 'ext3', 'ext4']:
+                dev_paths_to_mount.append(dev_path)
+
+        os_root_dir = None
+        for dev_path in dev_paths_to_mount:
+            tmp_dir = utils.exec_ssh_cmd(
+                ssh,  'mktemp -d').decode().split('\n')[0]
+            utils.exec_ssh_cmd(ssh, 'sudo mount %s %s' % (dev_path, tmp_dir))
+            dirs = utils.exec_ssh_cmd(
+                ssh, 'ls %s' % tmp_dir).decode().split('\n')
+            # TODO: better ways to check for a linux root?
+            if 'etc' in dirs and 'bin' in dirs and 'sbin' in dirs:
+                for dir in set(dirs).intersection(['proc', 'sys', 'dev',
+                                                   'run']):
+                    utils.exec_ssh_cmd(
+                        ssh,
+                        'sudo mount -o bind /%(dir)s/ %(mount_dir)s/%(dir)s' %
+                        {'dir': dir, 'mount_dir': tmp_dir})
+                os_root_dir = tmp_dir
+                break
+
+        if not os_root_dir:
+            raise Exception("root partition not found")
+
+        return os_root_dir

+ 65 - 0
coriolis/osmorphing/ubuntu.py

@@ -0,0 +1,65 @@
+import itertools
+import os
+import re
+
+from coriolis import constants
+from coriolis import utils
+
+
+class UbuntuOSMorphingTools(object):
+    _packages = {
+        (constants.HYPERVISOR_VMWARE, None): [("open-vm-tools", True)],
+        # TODO: sudo agt-get install linux-tool-<kernel release>
+        # linux-cloud-tools-<kernel release> -y
+        (constants.HYPERVISOR_HYPERV, None): [("hv-kvp-daemon-init", True)],
+        (None, constants.PLATFORM_OPENSTACK): [("cloud-init", True)],
+    }
+
+    def __init__(self, os_root_dir):
+        self._os_root_dir = os_root_dir
+
+    @staticmethod
+    def check_os(ssh, os_root_dir):
+        lsb_release_path = os.path.join(os_root_dir, "etc/lsb-release")
+        out = utils.exec_ssh_cmd(
+            ssh, "cat %s || true" % lsb_release_path).decode()
+
+        dist_id = re.findall('^DISTRIB_ID=(.*)$', out, re.MULTILINE)
+        release = re.findall('^DISTRIB_RELEASE=(.*)$', out, re.MULTILINE)
+        # TODO: validate release as well
+        if 'Ubuntu' in dist_id:
+            return True
+
+    def set_dhcp(self, ssh):
+        interfaces_path = os.path.join(
+            self._os_root_dir, "etc/network/interfaces")
+        utils.exec_ssh_cmd(
+            ssh, 'sudo sed -i.bak "s/static/dhcp/g" %s' % interfaces_path)
+
+    def get_packages(self, hypervisor, platform):
+        k_add = [(h, p) for (h, p) in self._packages.keys() if
+                 (h is None or h == hypervisor) and
+                 (p is None or p == platform)]
+
+        add = [p[0] for p in itertools.chain.from_iterable(
+               [l for k, l in self._packages.items() if k in k_add])]
+
+        k_remove = set(self._packages.keys()) - set(k_add)
+        remove = [p[0] for p in itertools.chain.from_iterable(
+                  [l for k, l in self._packages.items() if k in k_remove])
+                  if p[1]]
+
+        return add, remove
+
+    def update_packages_list(self, ssh):
+        apt_get_cmd = 'sudo apt-get update -y'
+        utils.exec_ssh_cmd_chroot(ssh, self._os_root_dir, apt_get_cmd)
+
+    def install_packages(self, ssh, package_names):
+        apt_get_cmd = 'sudo apt-get install %s -y' % " ".join(package_names)
+        utils.exec_ssh_cmd_chroot(ssh, self._os_root_dir, apt_get_cmd)
+
+    def uninstall_packages(self, ssh, package_names):
+        for package_name in package_names:
+            apt_get_cmd = 'sudo apt-get remove %s -y || true' % package_name
+            utils.exec_ssh_cmd_chroot(ssh, self._os_root_dir, apt_get_cmd)

+ 6 - 6
coriolis/providers/factory.py

@@ -5,20 +5,20 @@ from coriolis.providers import vmware_vsphere
 
 
 EXPORT_PROVIDERS = {
-    "vmware_vsphere": vmware_vsphere.ExportProvider
+    constants.PLATFORM_VMWARE_VSPHERE: vmware_vsphere.ExportProvider
 }
 
 IMPORT_PROVIDERS = {
-    "openstack": openstack.ImportProvider
+    constants.PLATFORM_OPENSTACK: openstack.ImportProvider
 }
 
 
-def get_provider(name, provider_type):
+def get_provider(platform_name, provider_type):
     if provider_type == constants.PROVIDER_TYPE_EXPORT:
-        cls = EXPORT_PROVIDERS.get(name)
+        cls = EXPORT_PROVIDERS.get(platform_name)
     elif provider_type == constants.PROVIDER_TYPE_IMPORT:
-        cls = IMPORT_PROVIDERS.get(name)
+        cls = IMPORT_PROVIDERS.get(platform_name)
 
     if not cls:
-        raise exception.NotFound("Provider not found: %s" % name)
+        raise exception.NotFound("Provider not found: %s" % platform_name)
     return cls()

+ 339 - 1
coriolis/providers/openstack/__init__.py

@@ -1,12 +1,95 @@
+import math
+import os
+import time
+import uuid
+
 from cinderclient import client as cinder_client
 from glanceclient import client as glance_client
 from keystoneauth1 import loading
 from keystoneauth1 import session
 from neutronclient.neutron import client as neutron_client
 from novaclient import client as nova_client
+from oslo_log import log as logging
+from oslo_utils import units
+import paramiko
 
-
+from coriolis import constants
+from coriolis.osmorphing import manager as osmorphing_manager
 from coriolis.providers import base
+from coriolis import utils
+
+NOVA_API_VERSION = 2
+GLANCE_API_VERSION = 1
+NEUTRON_API_VERSION = '2.0'
+CINDER_API_VERSION = 2
+
+MIGRATION_TMP_FORMAT = "migration_tmp_%s"
+DEFAULT_CONTAINER_FORMAT = "bare"
+
+DISK_HEADER_SIZE = 10 * units.Mi
+
+SSH_PORT = 22
+
+MIGR_USER_DATA = (
+    "#cloud-config\n"
+    "users:\n"
+    "  - name: %s\n"
+    "    ssh-authorized-keys:\n"
+    "      - %s\n"
+    "    sudo: ['ALL=(ALL) NOPASSWD:ALL']\n"
+    "    groups: sudo\n"
+    "    shell: /bin/bash\n"
+)
+MIGR_GUEST_USERNAME = 'cloudbase'
+
+LOG = logging.getLogger(__name__)
+
+
+class _MigrationResources(object):
+    def __init__(self, nova, neutron, keypair, instance, port,
+                 floating_ip, sec_group, k):
+        self._nova = nova
+        self._neutron = neutron
+        self._instance = instance
+        self._port = port
+        self._floating_ip = floating_ip
+        self._sec_group = sec_group
+        self._keypair = keypair
+        self._k = k
+
+    def get_guest_connection_info(self):
+        return (self._floating_ip.ip, SSH_PORT, MIGR_GUEST_USERNAME, self._k)
+
+    @utils.retry_on_error()
+    def _wait_for_instance_deletion(self, instance_id):
+        instances = self._nova.servers.findall(id=instance_id)
+        while instances and instances[0].status != 'ERROR':
+            time.sleep(2)
+            instances = self._nova.servers.findall(id=instance_id)
+        if instances:
+            raise Exception("VM is in status: %s" % instances[0].status)
+
+    def get_instance(self):
+        return self._instance
+
+    @utils.retry_on_error()
+    def delete(self):
+        if self._instance:
+            self._nova.servers.delete(self._instance)
+            self._wait_for_instance_deletion(self._instance.id)
+            self._instance = None
+        if self._floating_ip:
+            self._nova.floating_ips.delete(self._floating_ip)
+            self._floating_ip = None
+        if self._port:
+            self._neutron.delete_port(self._port['id'])
+            self._port = None
+        if self._sec_group:
+            self._nova.security_groups.delete(self._sec_group.id)
+            self._sec_group = None
+        if self._keypair:
+            self._nova.keypairs.delete(self._keypair.name)
+            self._keypair = None
 
 
 class ImportProvider(base.BaseExportProvider):
@@ -47,6 +130,261 @@ class ImportProvider(base.BaseExportProvider):
 
         return session.Session(auth=auth, verify=verify)
 
+    @utils.retry_on_error()
+    def _create_image(self, glance, name, disk_path, disk_format,
+                      container_format, hypervisor_type):
+        with open(disk_path, 'rb') as f:
+            return glance.images.create(
+                name=name,
+                disk_format=disk_format,
+                container_format=container_format,
+                properties={"hypervisor_type": hypervisor_type},
+                data=f)
+
+    @utils.retry_on_error()
+    def _wait_for_volume(self, nova, volume, expected_status='in-use'):
+        volume = nova.volumes.findall(id=volume.id)[0]
+        while volume.status not in [expected_status, 'error']:
+            time.sleep(2)
+            volume = nova.volumes.get(volume.id)
+        if volume.status != expected_status:
+            raise Exception("Volume is in status: %s" % volume.status)
+
+    @utils.retry_on_error()
+    def _wait_for_instance(self, nova, instance, expected_status='ACTIVE'):
+        instance = nova.servers.get(instance.id)
+        while instance.status not in [expected_status, 'ERROR']:
+            time.sleep(2)
+            instance = nova.servers.get(instance.id)
+        if instance.status != expected_status:
+            raise Exception("VM is in status: %s" % instance.status)
+
+    def _get_unique_name(self):
+        return MIGRATION_TMP_FORMAT % str(uuid.uuid4())
+
+    @utils.retry_on_error()
+    def _create_neutron_port(self, neutron, network_name, mac_address=None):
+        networks = neutron.list_networks(name=network_name)
+        network_id = networks['networks'][0]['id']
+
+        # make sure that the port is not already existing from a previous
+        # migration attempt
+        if mac_address:
+            ports = neutron.list_ports(
+                mac_address=mac_address).get('ports', [])
+            if ports:
+                neutron.delete_port(ports[0]['id'])
+
+        body = {"port": {
+                "network_id": network_id,
+                }}
+        if mac_address:
+            body["port"]["mac_address"] = mac_address
+
+        return neutron.create_port(body=body)['port']
+
+    @utils.retry_on_error()
+    def _create_keypair(self, nova, name, public_key):
+        if nova.keypairs.findall(name=name):
+            nova.keypairs.delete(name)
+        return nova.keypairs.create(name=name, public_key=public_key)
+
+    @utils.retry_on_error()
+    def _deploy_migration_resources(self, nova, glance, neutron,
+                                    migr_image_name, migr_flavor_name,
+                                    migr_network_name, migr_fip_pool_name):
+        if not glance.images.findall(name=migr_image_name):
+            raise Exception("Glance image \"%s\" not found" % migr_image_name)
+
+        image = nova.images.find(name=migr_image_name)
+        flavor = nova.flavors.find(name=migr_flavor_name)
+
+        keypair = None
+        instance = None
+        floating_ip = None
+        sec_group = None
+        port = None
+
+        try:
+            migr_keypair_name = self._get_unique_name()
+
+            k = paramiko.RSAKey.generate(2048)
+            public_key = "ssh-rsa %s tmp@migration" % k.get_base64()
+            keypair = self._create_keypair(nova, migr_keypair_name, public_key)
+
+            port = self._create_neutron_port(neutron, migr_network_name)
+            userdata = MIGR_USER_DATA % (MIGR_GUEST_USERNAME, public_key)
+            instance = nova.servers.create(
+                name=self._get_unique_name(),
+                image=image,
+                flavor=flavor,
+                key_name=migr_keypair_name,
+                userdata=userdata,
+                nics=[{'port-id': port['id']}])
+
+            floating_ip = nova.floating_ips.create(pool=migr_fip_pool_name)
+            self._wait_for_instance(nova, instance, 'ACTIVE')
+
+            LOG.info("Floating IP: %s", floating_ip.ip)
+            instance.add_floating_ip(floating_ip)
+
+            migr_sec_group_name = self._get_unique_name()
+            sec_group = nova.security_groups.create(
+                name=migr_sec_group_name, description=migr_sec_group_name)
+            nova.security_group_rules.create(
+                sec_group.id,
+                ip_protocol="tcp",
+                from_port=SSH_PORT,
+                to_port=SSH_PORT)
+            instance.add_security_group(sec_group.id)
+
+            return _MigrationResources(nova, neutron, keypair, instance, port,
+                                       floating_ip, sec_group, k)
+        except:
+            if instance:
+                nova.servers.delete(instance)
+            if floating_ip:
+                nova.floating_ips.delete(floating_ip)
+            if port:
+                neutron.delete_port(port['id'])
+            if sec_group:
+                nova.security_groups.delete(sec_group.id)
+            if keypair:
+                nova.keypairs.delete(keypair.name)
+            raise
+
+    @utils.retry_on_error()
+    def _attach_volume(self, nova, instance, volume, volume_dev):
+        nova.volumes.create_server_volume(
+            instance.id, volume.id, volume_dev)
+        self._wait_for_volume(nova, volume, 'in-use')
+
     def import_instance(self, connection_info, target_environment,
                         instance_name, export_info):
         session = self._create_keystone_session(connection_info)
+
+        nova = nova_client.Client(NOVA_API_VERSION, session=session)
+        glance = glance_client.Client(GLANCE_API_VERSION, session=session)
+        neutron = neutron_client.Client(NEUTRON_API_VERSION, session=session)
+        cinder = cinder_client.Client(CINDER_API_VERSION, session=session)
+
+        glance_upload = target_environment.get("glance_upload", False)
+        target_disk_format = target_environment.get(
+            "disk_format", constants.DISK_FORMAT_QCOW2)
+        container_format = target_environment.get(
+            "container_format", DEFAULT_CONTAINER_FORMAT)
+        hypervisor_type = target_environment.get("hypervisor_type")
+        fip_pool_name = target_environment.get("fip_pool_name")
+        network_name = target_environment.get("network_name")
+        flavor_name = target_environment.get("flavor_name")
+        keypair_name = target_environment.get("keypair_name")
+
+        migr_image_name = target_environment.get("migr_image_name")
+        migr_flavor_name = target_environment.get("migr_flavor_name",
+                                                  flavor_name)
+        migr_network_name = target_environment.get("migr_network_name",
+                                                   network_name)
+        migr_fip_pool_name = target_environment.get("migr_fip_pool_name",
+                                                    fip_pool_name)
+
+        disks_info = export_info["devices"]["disks"]
+
+        images = []
+        volumes = []
+        volume_devs = []
+
+        if glance_upload:
+            for disk_info in disks_info:
+                disk_path = disk_info["path"]
+                disk_file_info = utils.get_disk_info(disk_path)
+
+                # if target_disk_format == disk_file_info["format"]:
+                #    target_disk_path = disk_path
+                # else:
+                #    target_disk_path = (
+                #        "%s.%s" % (os.path.splitext(disk_path)[0],
+                #                   target_disk_format))
+                #    utils.convert_disk_format(disk_path, target_disk_path,
+                #                              target_disk_format)
+
+                disk_format = disk_file_info["format"]
+                image = self._create_image(
+                    glance, self._get_unique_name(),
+                    disk_path, disk_format,
+                    container_format, hypervisor_type)
+                images.append(image)
+
+                virtual_disk_size = disk_file_info["virtual-size"]
+                if disk_format != constants.DISK_FORMAT_RAW:
+                    virtual_disk_size += DISK_HEADER_SIZE
+
+                volume_size_gb = math.ceil(virtual_disk_size / units.Gi)
+                volume = nova.volumes.create(
+                    size=volume_size_gb,
+                    display_name=self._get_unique_name(),
+                    imageRef=image.id)
+                volumes.append(volume)
+
+        migr_resources = self._deploy_migration_resources(
+            nova, glance, neutron, migr_image_name, migr_flavor_name,
+            migr_network_name, migr_fip_pool_name)
+
+        try:
+            for i, volume in enumerate(volumes):
+                self._wait_for_volume(nova, volume, 'available')
+                glance.images.delete(images[i].id)
+
+                # TODO: improve device assignment
+                volume_dev = "/dev/sd%s" % chr(ord('a') + i + 1)
+                volume_devs.append(volume_dev)
+                self._attach_volume(nova, migr_resources.get_instance(),
+                                    volume, volume_dev)
+
+                guest_conn_info = migr_resources.get_guest_connection_info()
+
+                osmorphing_manager.morph_image(guest_conn_info,
+                                               hypervisor_type,
+                                               constants.PLATFORM_OPENSTACK,
+                                               volume_devs)
+        finally:
+            migr_resources.delete()
+
+        ports = []
+        nics_info = export_info["devices"].get("nics", [])
+        for nic_info in nics_info:
+            ports.append(self._create_neutron_port(
+                neutron, network_name, nic_info.get("mac_address")))
+
+        instance = self._create_target_instance(
+            nova, flavor_name, instance_name, keypair_name, ports, volumes,
+            migr_image_name)
+
+    @utils.retry_on_error()
+    def _create_target_instance(self, nova, flavor_name, instance_name,
+                                keypair_name, ports, volumes, image_name):
+        flavor = nova.flavors.find(name=flavor_name)
+        image = nova.images.find(name=image_name)
+
+        block_device_mapping = {}
+        for i, volume in enumerate(volumes):
+            block_device_mapping['vd%s' % chr(ord('a') + i)] = volume.id
+
+        nics = [{'port-id': p['id']} for p in ports]
+
+        # Note: Nova requires an image even when booting from volume
+        LOG.info('Creating target instance...')
+        instance = nova.servers.create(
+            name=instance_name,
+            image=image,
+            flavor=flavor,
+            key_name=keypair_name,
+            block_device_mapping=block_device_mapping,
+            nics=nics)
+
+        try:
+            self._wait_for_instance(nova, instance, 'ACTIVE')
+            return instance
+        except:
+            if instance:
+                nova.servers.delete(instance)
+            raise

+ 104 - 0
coriolis/utils.py

@@ -1,7 +1,94 @@
+import functools
+import json
+import re
 import socket
 import subprocess
+import time
 import traceback
 
+from oslo_config import cfg
+from oslo_log import log as logging
+
+from coriolis import constants
+
+opts = [
+    cfg.StrOpt('qemu_img_path',
+               default='qemu-img',
+               help='The path of the qemu-img tool.'),
+]
+
+CONF = cfg.CONF
+CONF.register_opts(opts)
+
+LOG = logging.getLogger(__name__)
+
+
+def retry_on_error(max_attempts=5, sleep_seconds=1):
+    def _retry_on_error(func):
+        @functools.wraps(func)
+        def _exec_retry(*args, **kwargs):
+            i = 0
+            while True:
+                try:
+                    return func(*args, **kwargs)
+                except Exception as ex:
+                    i += 1
+                    if i < max_attempts:
+                        LOG.warn("Exception occurred, retrying: %s", ex)
+                        time.sleep(sleep_seconds)
+                    else:
+                        raise
+        return _exec_retry
+    return _retry_on_error
+
+
+def get_linux_os_info(ssh):
+    out = exec_ssh_cmd(ssh, "lsb_release -a || true").decode()
+    dist_id = re.findall('^Distributor ID:\s(.*)$', out, re.MULTILINE)
+    release = re.findall('^Release:\s(.*)$', out, re.MULTILINE)
+    if dist_id and release:
+        return (dist_id[0], release[0])
+
+
+@retry_on_error()
+def exec_ssh_cmd(ssh, cmd):
+    stdin, stdout, stderr = ssh.exec_command(cmd)
+    exit_code = stdout.channel.recv_exit_status()
+    std_out = stdout.read()
+    std_err = stderr.read()
+    if exit_code:
+        raise Exception("Command \"%s\" failed with exit code: %s\n"
+                        "stdout: %s\nstd_err: %s" %
+                        (cmd, exit_code, std_out, std_err))
+    return std_out
+
+
+def exec_ssh_cmd_chroot(ssh, chroot_dir, cmd):
+    return exec_ssh_cmd(ssh, "sudo chroot %s %s" % (chroot_dir, cmd))
+
+
+def _check_port_open(host, port):
+    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+    try:
+        s.settimeout(1)
+        s.connect((host, port))
+        return True
+    except ConnectionRefusedError:
+        return False
+    except socket.timeout:
+        return False
+    finally:
+        s.close()
+
+
+def wait_for_port_connectivity(address, port):
+    i = 0
+    while not _check_port_open(address, port) and i < 120:
+        time.sleep(1)
+        i += 1
+    if i == 120:
+        raise Exception("Connection failed on port %s" % port)
+
 
 def exec_process(args):
     p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
@@ -13,6 +100,23 @@ def exec_process(args):
     return std_out
 
 
+def get_disk_info(disk_path):
+    out = exec_process([CONF.qemu_img_path, 'info', '--output=json',
+                        disk_path])
+    disk_info = json.loads(out.decode())
+
+    if disk_info["format"] == "vpc":
+        disk_info["format"] = constants.DISK_FORMAT_VHD
+    return disk_info
+
+
+def convert_disk_format(disk_path, target_disk_path, target_format):
+    if target_format == constants.DISK_FORMAT_VHD:
+        target_format = "vpc"
+    exec_process([CONF.qemu_img_path, 'convert', '-O', target_format,
+                  disk_path, target_disk_path])
+
+
 def get_hostname():
     return socket.gethostname()
 

+ 27 - 26
coriolis/worker/rpc/server.py

@@ -22,7 +22,6 @@ worker_opts = [
 CONF = cfg.CONF
 CONF.register_opts(worker_opts, 'worker')
 
-
 LOG = logging.getLogger(__name__)
 
 VERSION = "1.0"
@@ -53,8 +52,9 @@ class WorkerServerEndpoint(object):
             LOG.info("Task process not found: %s", process_id)
 
     def _exec_task_process(self, ctxt, task_id, target, args):
-        mp_q = multiprocessing.Queue()
-        p = multiprocessing.Process(target=target, args=(args + (mp_q,)))
+        mp_ctx = multiprocessing.get_context('spawn')
+        mp_q = mp_ctx.Queue()
+        p = mp_ctx.Process(target=target, args=(args + (mp_q,)))
 
         p.start()
         LOG.info("Task process started: %s", task_id)
@@ -63,26 +63,15 @@ class WorkerServerEndpoint(object):
 
         p.join()
 
-        try:
-            result = mp_q.get(False)
-        except queue.Empty:
+        if mp_q.empty():
             raise Exception("Task process terminated")
+        result = mp_q.get(False)
 
         if isinstance(result, str):
             raise exception.TaskProcessException(result)
         return result
 
     def export_instance(self, ctxt, task_id, origin, instance):
-        def _export_instance(export_provider, connection_info,
-                             instance, export_path, mp_q):
-            try:
-                vm_info = export_provider.export_instance(
-                    connection_info, instance, export_path)
-                mp_q.put(vm_info)
-            except Exception as ex:
-                mp_q.put(utils.get_exception_details())
-                LOG.exception(ex)
-
         try:
             export_provider = factory.get_provider(
                 origin["type"], constants.PROVIDER_TYPE_EXPORT)
@@ -111,16 +100,6 @@ class WorkerServerEndpoint(object):
 
     def import_instance(self, ctxt, task_id, destination, instance,
                         export_info):
-        def _import_instance(import_provider, connection_info,
-                             target_environment, instance, export_info, mp_q):
-            try:
-                import_provider.import_instance(
-                    connection_info, target_environment, instance, export_info)
-                mp_q.put(None)
-            except Exception as ex:
-                mp_q.put(utils.get_exception_details())
-                LOG.exception(ex)
-
         try:
             import_provider = factory.get_provider(
                 destination["type"], constants.PROVIDER_TYPE_IMPORT)
@@ -144,3 +123,25 @@ class WorkerServerEndpoint(object):
                 ctxt, task_id, stack_trace)
         finally:
             self._cleanup_task_resources(task_id)
+
+
+def _export_instance(export_provider, connection_info,
+                     instance, export_path, mp_q):
+    try:
+        vm_info = export_provider.export_instance(
+            connection_info, instance, export_path)
+        mp_q.put(vm_info)
+    except Exception as ex:
+        mp_q.put(utils.get_exception_details())
+        LOG.exception(ex)
+
+
+def _import_instance(import_provider, connection_info,
+                     target_environment, instance, export_info, mp_q):
+    try:
+        import_provider.import_instance(
+            connection_info, target_environment, instance, export_info)
+        mp_q.put(None)
+    except Exception as ex:
+        mp_q.put(utils.get_exception_details())
+        LOG.exception(ex)