Procházet zdrojové kódy

Adds OpenStack replica support

Alessandro Pilotti před 9 roky
rodič
revize
44cf3bf038

+ 4 - 0
coriolis/exception.py

@@ -237,6 +237,10 @@ class VolumeSnapshotNotFound(NotFound):
     message = _("Volume snapshot \"%(snapshot_id)s\" could not be found.")
 
 
+class VolumeBackupNotFound(NotFound):
+    message = _("Volume backup \"%(backup_id)s\" could not be found.")
+
+
 class Duplicate(CoriolisException):
     pass
 

+ 168 - 0
coriolis/providers/backup_writers.py

@@ -0,0 +1,168 @@
+# Copyright 2016 Cloudbase Solutions Srl
+# All Rights Reserved.
+
+import abc
+import contextlib
+import os
+
+from oslo_log import log as logging
+import paramiko
+
+from coriolis import data_transfer
+from coriolis import exception
+from coriolis import utils
+
+LOG = logging.getLogger(__name__)
+
+
+class BaseBackupWriter(metaclass=abc.ABCMeta):
+    @abc.abstractmethod
+    def _open(self):
+        pass
+
+    @contextlib.contextmanager
+    def open(self, path, disk_id):
+        self._path = path
+        self._disk_id = disk_id
+        self._open()
+        try:
+            yield self
+        finally:
+            self.close()
+
+    @abc.abstractmethod
+    def seek(self, pos):
+        pass
+
+    @abc.abstractmethod
+    def truncate(self, size):
+        pass
+
+    @abc.abstractmethod
+    def write(self, data):
+        pass
+
+    @abc.abstractmethod
+    def close(self):
+        pass
+
+
+class FileBackupWriter(BaseBackupWriter):
+    def _open(self):
+        # Create file if it doesnt exist
+        open(self._path, 'ab+').close()
+        self._file = open(self._path, 'rb+')
+
+    def seek(self, pos):
+        self._file.seek(pos)
+
+    def truncate(self, size):
+        self._file.truncate(size)
+
+    def write(self, data):
+        self._file.write(data)
+
+    def close(self):
+        self._file.close()
+
+
+class SSHBackupWriter(BaseBackupWriter):
+    def __init__(self, ip, port, username, pkey, password, volumes_info):
+        self._ip = ip
+        self._port = port
+        self._username = username
+        self._pkey = pkey
+        self._password = password
+        self._volumes_info = volumes_info
+        self._ssh = None
+
+    @contextlib.contextmanager
+    def open(self, path, disk_id):
+        self._path = path
+        self._disk_id = disk_id
+        self._open()
+        try:
+            yield self
+            # Don't send a message via ssh on exception
+            self.close()
+        except:
+            self._ssh.close()
+            raise
+
+    @utils.retry_on_error()
+    def _connect_ssh(self):
+        LOG.info("Connecting to SSH host: %(ip)s:%(port)s" %
+                 {"ip": self._ip, "port": self._port})
+        self._ssh = paramiko.SSHClient()
+        self._ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
+        self._ssh.connect(
+            hostname=self._ip,
+            port=self._port,
+            username=self._username,
+            pkey=self._pkey,
+            password=self._password)
+
+    @utils.retry_on_error()
+    def _copy_helper_cmd(self):
+        sftp = self._ssh.open_sftp()
+        local_path = os.path.join(
+            utils.get_resources_dir(), 'write_data')
+        sftp.put(local_path, 'write_data')
+        sftp.close()
+
+    @utils.retry_on_error()
+    def _exec_helper_cmd(self):
+        self._msg_id = 0
+        self._offset = 0
+        self._stdin, self._stdout, self._stderr = self._ssh.exec_command(
+            "chmod +x write_data && sudo ./write_data")
+
+    def _encode_data(self, content):
+        path = [v for v in self._volumes_info
+                if v["disk_id"] == self._disk_id][0]["volume_dev"]
+
+        msg = data_transfer.encode_data(
+            self._msg_id, path, self._offset, content)
+
+        LOG.debug(
+            "Guest path: %(path)s, offset: %(offset)d, content len: "
+            "%(content_len)d, msg len: %(msg_len)d",
+            {"path": path, "offset": self._offset, "content_len": len(content),
+             "msg_len": len(msg)})
+        return msg
+
+    def _encode_eod(self):
+        msg = data_transfer.encode_eod(self._msg_id)
+        LOG.debug("EOD message len: %d", len(msg))
+        return msg
+
+    @utils.retry_on_error()
+    def _send_msg(self, data):
+        self._msg_id += 1
+        self._stdin.write(data)
+        self._stdin.flush()
+        out_msg_id = self._stdout.read(4)
+
+    def _open(self):
+        self._connect_ssh()
+        self._copy_helper_cmd()
+        self._exec_helper_cmd()
+
+    def seek(self, pos):
+        self._offset = pos
+
+    def truncate(self, size):
+        pass
+
+    def write(self, data):
+        self._send_msg(self._encode_data(data))
+        self._offset += len(data)
+
+    def close(self):
+        self._send_msg(self._encode_eod())
+        ret_val = self._stdout.channel.recv_exit_status()
+        if ret_val:
+            raise exception.CoriolisException(
+                "An exception occurred while writing data on target. "
+                "Error code: %s" % ret_val)
+        self._ssh.close()

+ 65 - 4
coriolis/providers/openstack/common.py

@@ -7,6 +7,7 @@ import time
 import uuid
 
 from oslo_utils import units
+from swiftclient import client as swift_client
 
 from coriolis import exception
 from coriolis import utils
@@ -84,8 +85,8 @@ def wait_for_image(nova, image_id, expected_status='ACTIVE'):
 
 
 @utils.retry_on_error()
-def wait_for_instance(nova, instance, expected_status='ACTIVE'):
-    instance = nova.servers.get(instance.id)
+def wait_for_instance(nova, instance_id, expected_status='ACTIVE'):
+    instance = nova.servers.get(instance_id)
     while instance.status not in [expected_status, 'ERROR']:
         time.sleep(2)
         instance = nova.servers.get(instance.id)
@@ -182,8 +183,8 @@ def delete_volume(cinder, volume_id):
 
 
 @utils.retry_on_error()
-def create_volume_snapshot(cinder, volume_id, name):
-    return cinder.volume_snapshots.create(volume_id, name=name)
+def create_volume_snapshot(cinder, volume_id, name, force=False):
+    return cinder.volume_snapshots.create(volume_id, name=name, force=force)
 
 
 @utils.retry_on_error(terminal_exceptions=[exception.NotFound])
@@ -216,3 +217,63 @@ def delete_volume_snapshot(cinder, snapshot_id):
     snapshots = cinder.volume_snapshots.findall(id=snapshot_id)
     for snapshot in snapshots:
         return cinder.volume_snapshots.delete(snapshot.id)
+
+
+@utils.retry_on_error()
+def create_volume_backup(cinder, volume_id, snapshot_id, name, container,
+                         incremental, force=False):
+    return cinder.backups.create(
+        volume_id=volume_id,
+        snapshot_id=snapshot_id,
+        container=container,
+        name=name,
+        incremental=incremental,
+        force=force)
+
+
+@utils.retry_on_error(terminal_exceptions=[exception.NotFound])
+def wait_for_volume_backup(cinder, backup_id, expected_status='available'):
+    backups = cinder.backups.findall(id=backup_id)
+
+    if not backups:
+        if expected_status == 'deleted':
+            return
+        raise exception.VolumeBackupNotFound(backup_id=backup_id)
+    backup = backups[0]
+
+    while backup.status not in [expected_status, 'error']:
+        time.sleep(2)
+        if expected_status == 'deleted':
+            backups = cinder.backups.findall(id=backup_id)
+            if not backups:
+                return
+            backup = backups[0]
+        else:
+            backup = cinder.backups.get(backup.id)
+    if backup.status != expected_status:
+        raise exception.CoriolisException(
+            "Volume backup is in status: %s" % backup.status)
+
+
+@utils.retry_on_error()
+def delete_volume_backup(cinder, backup_id):
+    cinder.backups.delete(backup_id)
+
+
+@utils.retry_on_error()
+def find_volume_backups(cinder, volume_id=None, container=None):
+    return cinder.backups.list(search_opts={
+        "volume_id": volume_id,
+        "container": container})
+
+
+@utils.retry_on_error()
+def get_instance_volumes(nova, instance_id):
+    return nova.volumes.get_server_volumes(instance_id)
+
+
+def get_swift_client(session):
+    preauthurl = session.get_endpoint(service_type="object-store")
+    preauthtoken = session.get_token()
+    return swift_client.Connection(preauthurl=preauthurl,
+                                   preauthtoken=preauthtoken)

+ 287 - 33
coriolis/providers/openstack/exp.py

@@ -1,27 +1,43 @@
 # Copyright 2016 Cloudbase Solutions Srl
 # All Rights Reserved.
-
+import gc
+import json
 import os
+import re
+import zlib
 
+from cinderclient import client as cinder_client
 from glanceclient import client as glance_client
 from novaclient import client as nova_client
 from oslo_config import cfg
 from oslo_log import log as logging
+from oslo_utils import units
 
 from coriolis import constants
 from coriolis import events
 from coriolis import exception
 from coriolis import keystone
+from coriolis.providers import backup_writers
 from coriolis.providers import base
 from coriolis.providers.openstack import common
 from coriolis import schemas
 from coriolis import utils
 
+opts = [
+    cfg.StrOpt('volume_backups_container',
+               default="coriolis",
+               help='Cinder volume backups container name.'),
+]
+
 CONF = cfg.CONF
+CONF.register_opts(opts, 'openstack_migration_provider')
+
 LOG = logging.getLogger(__name__)
 
+VOLUME_BACKUP_VERSION = '1.0.0'
+
 
-class ExportProvider(base.BaseExportProvider):
+class ExportProvider(base.BaseExportProvider, base.BaseReplicaExportProvider):
 
     platform = constants.PLATFORM_OPENSTACK
 
@@ -132,23 +148,13 @@ class ExportProvider(base.BaseExportProvider):
             size=image_size
         )
 
-    def export_instance(self, ctxt, connection_info, instance_name,
-                        export_path):
-        session = keystone.create_keystone_session(ctxt, connection_info)
-
-        glance_api_version = connection_info.get("image_api_version",
-                                                 common.GLANCE_API_VERSION)
-
-        nova = nova_client.Client(common.NOVA_API_VERSION, session=session)
-        glance = glance_client.Client(glance_api_version, session=session)
-
+    def _get_instance_info(self, nova, cinder, instance_name):
         self._event_manager.progress_update("Retrieving OpenStack instance")
         instance = self._get_instance(nova, instance_name)
 
         @utils.retry_on_error()
         def _get_flavor_by_id():
             return nova.flavors.get(instance.flavor["id"])
-
         flavor = _get_flavor_by_id()
 
         nics = []
@@ -164,6 +170,38 @@ class ExportProvider(base.BaseExportProvider):
                          'network_id': iface.net_id,
                          'network_name': net_name})
 
+        disks = []
+        vol_attachments = common.get_instance_volumes(nova, instance.id)
+        for vol_attachment in vol_attachments:
+            volume = cinder.volumes.get(vol_attachment.volumeId)
+            disks.append({
+                'format': constants.DISK_FORMAT_RAW,
+                'guest_device': vol_attachment.device,
+                'size_bytes': volume.size * units.Gi,
+                'id': volume.id
+            })
+
+        vm_info = {
+            'num_cpu': flavor.vcpus,
+            'num_cores_per_socket': 1,
+            'memory_mb': flavor.ram,
+            'nested_virtualization': False,
+            'name': instance_name,
+            'id': instance.id,
+            'flavor_name': flavor.name,
+            'devices': {
+                "nics": nics,
+                "disks": disks,
+                "cdroms": [],
+                "serial_ports": [],
+                "floppies": [],
+                "controllers": []
+            }
+        }
+
+        return instance, vm_info
+
+    def _check_shutdown_instance(self, nova, instance):
         if instance.status != 'SHUTOFF':
             self._event_manager.progress_update(
                 "Shutting down instance")
@@ -173,10 +211,23 @@ class ExportProvider(base.BaseExportProvider):
                 instance.stop()
 
             _stop_instance()
-            common.wait_for_instance(nova, instance, 'SHUTOFF')
+            common.wait_for_instance(nova, instance.id, 'SHUTOFF')
 
-        self._event_manager.progress_update("Creating instance snapshot")
+    def export_instance(self, ctxt, connection_info, instance_name,
+                        export_path):
+        session = keystone.create_keystone_session(ctxt, connection_info)
+        glance_api_version = connection_info.get("image_api_version",
+                                                 common.GLANCE_API_VERSION)
+        nova = nova_client.Client(common.NOVA_API_VERSION, session=session)
+        glance = glance_client.Client(glance_api_version, session=session)
+        cinder = cinder_client.Client(common.CINDER_API_VERSION,
+                                      session=session)
 
+        instance, vm_info = self._get_instance_info(
+            nova, cinder, instance_name)
+        self._check_shutdown_instance(nova, instance)
+
+        self._event_manager.progress_update("Creating instance snapshot")
         image = self._create_snapshot(
             nova, glance, instance, export_path)
 
@@ -188,24 +239,227 @@ class ExportProvider(base.BaseExportProvider):
             'id': str(image.id)
         })
 
-        vm_info = {
-            'num_cpu': flavor.vcpus,
-            'num_cores_per_socket': 1,
-            'memory_mb': flavor.ram,
-            'nested_virtualization': False,
-            'name': instance_name,
-            'os_type': image.os_type,
-            'id': instance.id,
-            'flavor_name': flavor.name,
-            'devices': {
-                "nics": nics,
-                "disks": disks,
-                "cdroms": [],
-                "serial_ports": [],
-                "floppies": [],
-                "controllers": []
-            }
-        }
+        vm_info['os_type'] = image.os_type
+        vm_info['devices']['disks'] = disks
 
         LOG.info("vm info: %s" % str(vm_info))
         return vm_info
+
+    def get_replica_instance_info(self, ctxt, connection_info, instance_name):
+        session = keystone.create_keystone_session(ctxt, connection_info)
+        nova = nova_client.Client(common.NOVA_API_VERSION, session=session)
+        cinder = cinder_client.Client(common.CINDER_API_VERSION,
+                                      session=session)
+
+        instance, vm_info = self._get_instance_info(
+            nova, cinder, instance_name)
+        return vm_info
+
+    def deploy_replica_source_resources(self, ctxt, connection_info):
+        pass
+
+    def delete_replica_source_resources(self, ctxt, connection_info,
+                                        migr_resources_dict):
+        pass
+
+    @utils.retry_on_error()
+    def _get_backup_metadata(self, swift, volume_id, backup_id, container):
+        objects = swift.get_container(container)[1]
+
+        base_re = (r'volume_%(volume_id)s/\d+/az_nova_backup_%(backup_id)s' %
+                   {'volume_id': volume_id, 'backup_id': backup_id})
+
+        metadata_object_name = None
+        for obj in objects:
+            if re.match("%s_metadata" % base_re, obj["name"]):
+                metadata_object_name = obj["name"]
+                break
+
+        if not metadata_object_name:
+            raise exception.NotFound('Cinder backup metadata not found')
+
+        metadata_json = swift.get_object(container, metadata_object_name)[1]
+        metadata = json.loads(metadata_json.decode())
+
+        if metadata['version'] != VOLUME_BACKUP_VERSION:
+            raise exception.CoriolisException(
+                "Unsupported Cinder backup metadata version: %s" %
+                metadata['version'])
+
+        if metadata['backup_id'] != backup_id:
+            raise exception.CoriolisException(
+                "Metadata backup id does not match")
+
+        return metadata
+
+    @utils.retry_on_error()
+    def _read_backup_metadata_object(self, swift, metadata, container):
+        name, info = list(metadata.items())[0]
+        offset = info.get('offset', 0)
+        compression = info.get('compression')
+        md5 = info.get('md5')
+
+        data = swift.get_object(container, name)[1]
+
+        if compression == 'zlib':
+            data = zlib.decompress(data)
+        elif compression:
+            raise exception.CoriolisException(
+                "Unsupported compression format: %s" % compression)
+
+        if md5:
+            utils.check_md5(data, md5)
+
+        return data, offset
+
+    def replicate_disks(self, ctxt, connection_info, instance_name,
+                        source_conn_info, target_conn_info, volumes_info,
+                        incremental):
+        session = keystone.create_keystone_session(ctxt, connection_info)
+        cinder = cinder_client.Client(common.CINDER_API_VERSION,
+                                      session=session)
+        swift = common.get_swift_client(session)
+
+        ip = target_conn_info["ip"]
+        port = target_conn_info.get("port", 22)
+        username = target_conn_info["username"]
+        pkey = target_conn_info.get("pkey")
+        password = target_conn_info.get("password")
+
+        container = CONF.openstack_migration_provider.volume_backups_container
+
+        volumes_info, backups = self._create_volume_backups(
+            cinder, volumes_info, container)
+
+        LOG.info("Waiting for connectivity on host: %(ip)s:%(port)s",
+                 {"ip": ip, "port": port})
+        utils.wait_for_port_connectivity(ip, port)
+
+        for volume_info in volumes_info:
+            self._event_manager.progress_update(
+                "Retrieving backup metadata")
+
+            last_backup_id = volume_info.get("last_backup_id")
+            backup_metadata_list = []
+
+            volume_id = volume_info["disk_id"]
+            backup_id = backups[volume_id].id
+
+            # Other backups might have occurred since the last replica
+            # execution, make sure to replicate all data
+            while True:
+                backup_metadata = self._get_backup_metadata(
+                    swift, volume_id, backup_id, container)
+                backup_metadata_list.insert(0, backup_metadata)
+                previous_backup_id = backup_metadata.get("parent_id")
+                if (not previous_backup_id or
+                        previous_backup_id == last_backup_id):
+                    break
+                backup_id = previous_backup_id
+
+            backup_writer = backup_writers.SSHBackupWriter(
+                ip, port, username, pkey, password, volumes_info)
+
+            for backup_metadata in backup_metadata_list:
+                objects_metadata = backup_metadata["objects"]
+                backup_id = backup_metadata["backup_id"]
+
+                total_written_bytes = 0
+                total_bytes = sum([list(o.values())[0]['length']
+                                  for o in objects_metadata])
+
+                perc_step = self._event_manager.add_percentage_step(
+                    total_bytes,
+                    message_format="Volume backup %s replica progress: "
+                    "{:.0f}%%" % backup_id)
+
+                max_chunk_size = 10 * units.Mi
+
+                with backup_writer.open("", volume_id) as f:
+                    for object_metadata in objects_metadata:
+                        data, offset = self._read_backup_metadata_object(
+                            swift, object_metadata, container)
+
+                        f.seek(offset)
+
+                        i = 0
+                        while i < len(data):
+                            f.write(data[i:i + max_chunk_size])
+                            i += max_chunk_size
+
+                        total_written_bytes += len(data)
+                        self._event_manager.set_percentage_step(
+                            perc_step, total_written_bytes)
+
+                        data = None
+                        gc.collect()
+
+            volume_info["last_backup_id"] = backups[volume_id].id
+
+        return volumes_info
+
+    def _create_volume_backups(self, cinder, volumes_info, container):
+        snapshots = {}
+        backups = {}
+        try:
+            self._event_manager.progress_update("Creating volume snapshots")
+            for volume_info in volumes_info:
+                volume_id = volume_info["disk_id"]
+                snapshot = common.create_volume_snapshot(
+                    cinder, volume_id, common.get_unique_name(), force=True)
+                snapshots[volume_id] = snapshot
+
+            self._event_manager.progress_update(
+                "Waiting for volume snapshots to complete")
+            for snapshot in snapshots.values():
+                common.wait_for_volume_snapshot(cinder, snapshot.id)
+
+            self._event_manager.progress_update("Creating volume backups")
+            for volume_info in volumes_info:
+                volume_id = volume_info["disk_id"]
+
+                volume_backups = common.find_volume_backups(
+                    cinder, volume_id, container)
+                incremental = len(volume_backups) > 0
+
+                snapshot = snapshots[volume_id]
+                volume_backup = common.create_volume_backup(
+                    cinder,
+                    volume_id=volume_id,
+                    snapshot_id=snapshot.id,
+                    container=container,
+                    name=common.get_unique_name(),
+                    incremental=incremental)
+                backups[volume_id] = volume_backup
+
+            self._event_manager.progress_update(
+                "Waiting for volume backups to complete")
+            for backup in backups.values():
+                common.wait_for_volume_backup(cinder, backup.id)
+
+            return volumes_info, backups
+        except Exception as ex:
+            LOG.exception(ex)
+            self._event_manager.progress_update(
+                "Deleting volume backups")
+            for backup in backups.values():
+                common.delete_volume_backup(cinder, backup.id)
+            self._event_manager.progress_update(
+                "Waiting for volume backups to be deleted")
+            for backup in backups.values():
+                common.wait_for_volume_backup(cinder, backup.id, 'deleted')
+            raise
+        finally:
+            self._event_manager.progress_update("Deleting volume snapshots")
+            for snapshot in snapshots.values():
+                common.delete_volume_snapshot(cinder, snapshot.id)
+            self._event_manager.progress_update(
+                "Waiting for volume snapshots to be deleted")
+            for snapshot in snapshots.values():
+                common.wait_for_volume_snapshot(cinder, snapshot.id, 'deleted')
+
+    def shutdown_instance(self, ctxt, connection_info, instance_name):
+        session = keystone.create_keystone_session(ctxt, connection_info)
+        nova = nova_client.Client(common.NOVA_API_VERSION, session=session)
+        instance = self._get_instance(nova, instance_name)
+        self._check_shutdown_instance(nova, instance)

+ 2 - 2
coriolis/providers/openstack/imp.py

@@ -287,7 +287,7 @@ class ImportProvider(base.BaseImportProvider, base.BaseReplicaImportProvider):
                 "Adding migration worker instance floating IP")
 
             floating_ip = nova.floating_ips.create(pool=migr_fip_pool_name)
-            common.wait_for_instance(nova, instance, 'ACTIVE')
+            common.wait_for_instance(nova, instance.id, 'ACTIVE')
 
             LOG.info("Floating IP: %s", floating_ip.ip)
             instance.add_floating_ip(floating_ip)
@@ -677,7 +677,7 @@ class ImportProvider(base.BaseImportProvider, base.BaseReplicaImportProvider):
             nics=nics)
 
         try:
-            common.wait_for_instance(nova, instance, 'ACTIVE')
+            common.wait_for_instance(nova, instance.id, 'ACTIVE')
             return instance
         except:
             if instance:

+ 6 - 161
coriolis/providers/vmware_vsphere/__init__.py

@@ -1,12 +1,10 @@
 # Copyright 2016 Cloudbase Solutions Srl
 # All Rights Reserved.
 
-import abc
 import contextlib
 import gc
 import os
 import re
-import struct
 import sys
 import threading
 import time
@@ -16,15 +14,13 @@ import uuid
 import eventlet
 from oslo_config import cfg
 from oslo_log import log as logging
-from oslo_utils import units
-import paramiko
 from pyVim import connect
 from pyVmomi import vim
 
 from coriolis import constants
-from coriolis import data_transfer
 from coriolis import events
 from coriolis import exception
+from coriolis.providers import backup_writers
 from coriolis.providers import base
 from coriolis.providers.vmware_vsphere import guestid
 from coriolis import schemas
@@ -45,159 +41,6 @@ LOG = logging.getLogger(__name__)
 vixdisklib.init()
 
 
-class _BaseBackupWriter(metaclass=abc.ABCMeta):
-    @abc.abstractmethod
-    def _open(self):
-        pass
-
-    @contextlib.contextmanager
-    def open(self, path, disk_id):
-        self._path = path
-        self._disk_id = disk_id
-        self._open()
-        try:
-            yield self
-        finally:
-            self.close()
-
-    @abc.abstractmethod
-    def seek(self, pos):
-        pass
-
-    @abc.abstractmethod
-    def truncate(self, size):
-        pass
-
-    @abc.abstractmethod
-    def write(self, data):
-        pass
-
-    @abc.abstractmethod
-    def close(self):
-        pass
-
-
-class _FileBackupWriter(_BaseBackupWriter):
-    def _open(self):
-        # Create file if it doesnt exist
-        open(self._path, 'ab+').close()
-        self._file = open(self._path, 'rb+')
-
-    def seek(self, pos):
-        self._file.seek(pos)
-
-    def truncate(self, size):
-        self._file.truncate(size)
-
-    def write(self, data):
-        self._file.write(data)
-
-    def close(self):
-        self._file.close()
-
-
-class _SSHBackupWriter(_BaseBackupWriter):
-    def __init__(self, ip, port, username, pkey, password, volumes_info):
-        self._ip = ip
-        self._port = port
-        self._username = username
-        self._pkey = pkey
-        self._password = password
-        self._volumes_info = volumes_info
-        self._ssh = None
-
-    @contextlib.contextmanager
-    def open(self, path, disk_id):
-        self._path = path
-        self._disk_id = disk_id
-        self._open()
-        try:
-            yield self
-            # Don't send a message via ssh on exception
-            self.close()
-        except:
-            self._ssh.close()
-            raise
-
-    @utils.retry_on_error()
-    def _connect_ssh(self):
-        LOG.info("Connecting to SSH host: %(ip)s:%(port)s" %
-                 {"ip": self._ip, "port": self._port})
-        self._ssh = paramiko.SSHClient()
-        self._ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
-        self._ssh.connect(
-            hostname=self._ip,
-            port=self._port,
-            username=self._username,
-            pkey=self._pkey,
-            password=self._password)
-
-    @utils.retry_on_error()
-    def _copy_helper_cmd(self):
-        sftp = self._ssh.open_sftp()
-        local_path = os.path.join(
-            utils.get_resources_dir(), 'write_data')
-        sftp.put(local_path, 'write_data')
-        sftp.close()
-
-    @utils.retry_on_error()
-    def _exec_helper_cmd(self):
-        self._msg_id = 0
-        self._offset = 0
-        self._stdin, self._stdout, self._stderr = self._ssh.exec_command(
-            "chmod +x write_data && sudo ./write_data")
-
-    def _encode_data(self, content):
-        path = [v for v in self._volumes_info
-                if v["disk_id"] == self._disk_id][0]["volume_dev"]
-
-        msg = data_transfer.encode_data(
-            self._msg_id, path, self._offset, content)
-
-        LOG.debug(
-            "Guest path: %(path)s, offset: %(offset)d, content len: "
-            "%(content_len)d, msg len: %(msg_len)d",
-            {"path": path, "offset": self._offset, "content_len": len(content),
-             "msg_len": len(msg)})
-        return msg
-
-    def _encode_eod(self):
-        msg = data_transfer.encode_eod(self._msg_id)
-        LOG.debug("EOD message len: %d", len(msg))
-        return msg
-
-    @utils.retry_on_error()
-    def _send_msg(self, data):
-        self._msg_id += 1
-        self._stdin.write(data)
-        self._stdin.flush()
-        out_msg_id = self._stdout.read(4)
-
-    def _open(self):
-        self._connect_ssh()
-        self._copy_helper_cmd()
-        self._exec_helper_cmd()
-
-    def seek(self, pos):
-        self._offset = pos
-
-    def truncate(self, size):
-        pass
-
-    def write(self, data):
-        self._send_msg(self._encode_data(data))
-        self._offset += len(data)
-
-    def close(self):
-        self._send_msg(self._encode_eod())
-        ret_val = self._stdout.channel.recv_exit_status()
-        if ret_val:
-            raise exception.CoriolisException(
-                "An exception occurred while writing data on target. "
-                "Error code: %s" % ret_val)
-        self._ssh.close()
-
-
 class ExportProvider(base.BaseExportProvider, base.BaseReplicaExportProvider):
 
     platform = constants.PLATFORM_VMWARE_VSPHERE
@@ -669,7 +512,8 @@ class ExportProvider(base.BaseExportProvider, base.BaseReplicaExportProvider):
         with self._take_temp_vm_snapshot(vm, snapshot_name) as snapshot:
             self._backup_snapshot_disks(
                 snapshot, export_path, connection_info, context,
-                disk_paths, _FileBackupWriter(), incremental=False)
+                disk_paths, backup_writers.FileBackupWriter(),
+                incremental=False)
 
         self._shutdown_vm(vm)
 
@@ -678,7 +522,8 @@ class ExportProvider(base.BaseExportProvider, base.BaseReplicaExportProvider):
         with self._take_temp_vm_snapshot(vm, snapshot_name) as snapshot:
             self._backup_snapshot_disks(
                 snapshot, export_path, connection_info, context,
-                disk_paths, _FileBackupWriter(), incremental=True)
+                disk_paths, backup_writers.FileBackupWriter(),
+                incremental=True)
 
         return disk_paths
 
@@ -756,7 +601,7 @@ class ExportProvider(base.BaseExportProvider, base.BaseReplicaExportProvider):
         with self._connect(connection_info) as (context, si):
             vm = self._get_vm(si, instance_name)
 
-            backup_writer = _SSHBackupWriter(
+            backup_writer = backup_writers.SSHBackupWriter(
                 ip, port, username, pkey, password, volumes_info)
 
             snapshot_name = str(uuid.uuid4())

+ 9 - 0
coriolis/utils.py

@@ -2,6 +2,7 @@
 # All Rights Reserved.
 
 import functools
+import hashlib
 import io
 import json
 import os
@@ -321,3 +322,11 @@ def load_class(class_path):
     parts = class_path.rsplit('.', 1)
     module = __import__(parts[0], fromlist=parts[1])
     return getattr(module, parts[1])
+
+
+def check_md5(data, md5):
+    m = hashlib.md5()
+    m.update(data)
+    new_md5 = m.hexdigest()
+    if new_md5 != md5:
+        raise exception.CoriolisException("MD5 check failed")

+ 1 - 0
requirements.txt

@@ -25,6 +25,7 @@ python-keystoneclient
 python-neutronclient
 python-novaclient
 python-barbicanclient
+python-swiftclient
 pyVmomi
 pywinrm
 PyYAML