Alessandro Pilotti 10 лет назад
Родитель
Сommit
7a34a8619f

+ 6 - 0
coriolis/conductor/rpc/client.py

@@ -38,3 +38,9 @@ class ConductorClient(object):
     def set_task_error(self, ctxt, task_id, exception_details):
         self._client.call(ctxt, 'set_task_error', task_id=task_id,
                           exception_details=exception_details)
+
+    def task_progress_update(self, ctxt, task_id, current_step, total_steps,
+                             message):
+        self._client.cast(ctxt, 'task_progress_update', task_id=task_id,
+                          current_step=current_step, total_steps=total_steps,
+                          message=message)

+ 6 - 0
coriolis/conductor/rpc/server.py

@@ -131,3 +131,9 @@ class ConductorServerEndpoint(object):
         LOG.error("Migration failed: %s", migration.id)
         db_api.set_migration_status(
             ctxt, migration.id, constants.MIGRATION_STATUS_ERROR)
+
+    def task_progress_update(self, ctxt, task_id, current_step, total_steps,
+                             message):
+        LOG.info("Task progress update: %s", task_id)
+        db_api.add_task_progress_update(ctxt, task_id, current_step,
+                                        total_steps, message)

+ 12 - 1
coriolis/db/api.py

@@ -42,7 +42,7 @@ def get_migrations(context):
 @enginefacade.reader
 def get_migration(context, migration_id):
     return context.session.query(models.Migration).options(
-        orm.joinedload("tasks")).filter_by(
+        orm.joinedload("tasks").joinedload("progress_updates")).filter_by(
         project_id=context.tenant, id=migration_id).first()
 
 
@@ -84,3 +84,14 @@ def get_task(context, task_id, include_migration_tasks=False):
 
     return context.session.query(models.Task).options(join_options).filter_by(
         id=task_id).first()
+
+
+@enginefacade.writer
+def add_task_progress_update(context, task_id, current_step, total_steps,
+                             message):
+    task_progress_update = models.TaskProgressUpdate()
+    task_progress_update.task_id = task_id
+    task_progress_update.current_step = current_step
+    task_progress_update.total_steps = total_steps
+    task_progress_update.message = message
+    context.session.add(task_progress_update)

+ 17 - 0
coriolis/db/sqlalchemy/migrate_repo/versions/001_initial.py

@@ -45,9 +45,26 @@ def upgrade(migrate_engine):
         mysql_charset='utf8'
     )
 
+    task_progress_update = sqlalchemy.Table(
+        'task_progress_update', meta,
+        sqlalchemy.Column('id', sqlalchemy.String(36), primary_key=True,
+                          default=lambda: str(uuid.uuid4())),
+        sqlalchemy.Column('created_at', sqlalchemy.DateTime),
+        sqlalchemy.Column('updated_at', sqlalchemy.DateTime),
+        sqlalchemy.Column("task_id", sqlalchemy.String(36),
+                          sqlalchemy.ForeignKey('task.id'),
+                          nullable=False),
+        sqlalchemy.Column("current_step", sqlalchemy.Integer, nullable=False),
+        sqlalchemy.Column("total_steps", sqlalchemy.Integer, nullable=True),
+        sqlalchemy.Column("message", sqlalchemy.String(1024), nullable=True),
+        mysql_engine='InnoDB',
+        mysql_charset='utf8'
+    )
+
     tables = (
         migration,
         task,
+        task_progress_update,
     )
 
     for index, table in enumerate(tables):

+ 17 - 0
coriolis/db/sqlalchemy/models.py

@@ -10,6 +10,20 @@ from coriolis.db.sqlalchemy import types
 BASE = declarative.declarative_base()
 
 
+class TaskProgressUpdate(BASE, models.TimestampMixin, models.ModelBase):
+    __tablename__ = 'task_progress_update'
+
+    id = sqlalchemy.Column(sqlalchemy.String(36),
+                           default=lambda: str(uuid.uuid4()),
+                           primary_key=True)
+    task_id = sqlalchemy.Column(sqlalchemy.String(36),
+                                sqlalchemy.ForeignKey('task.id'),
+                                nullable=False)
+    current_step = sqlalchemy.Column(sqlalchemy.Integer, nullable=False)
+    total_steps = sqlalchemy.Column(sqlalchemy.Integer, nullable=True)
+    message = sqlalchemy.Column(sqlalchemy.String(1024), nullable=True)
+
+
 class Task(BASE, models.TimestampMixin, models.ModelBase):
     __tablename__ = 'task'
 
@@ -26,6 +40,9 @@ class Task(BASE, models.TimestampMixin, models.ModelBase):
     task_type = sqlalchemy.Column(sqlalchemy.String(100), nullable=False)
     exception_details = sqlalchemy.Column(sqlalchemy.Text, nullable=True)
     depends_on = sqlalchemy.Column(types.List, nullable=True)
+    progress_updates = orm.relationship(TaskProgressUpdate,
+                                        cascade="all,delete",
+                                        backref=orm.backref('task'))
 
 
 class Migration(BASE, models.TimestampMixin, models.ModelBase):

+ 2 - 0
coriolis/osmorphing/base.py

@@ -7,6 +7,8 @@ from coriolis import utils
 
 
 class BaseOSMorphingTools(object):
+    __metaclass__ = abc.ABCMeta
+
     _packages = {}
 
     def __init__(self, ssh, os_root_dir, hypervisor, platform):

+ 44 - 3
coriolis/providers/base.py

@@ -1,10 +1,51 @@
+import abc
+
+
 class Baseprovider(object):
-    pass
+    __metaclass__ = abc.ABCMeta
+
+    def __init__(self):
+        self._progress_update_manager = None
+        self._current_step = 0
+        self._total_steps = None
+
+    def set_progress_update_manager(self, progress_update_manager):
+        self._progress_update_manager = progress_update_manager
+
+    def _set_total_progress_steps(self, total_steps):
+        self._total_steps = total_steps
+
+    def _progress_update(self, message):
+        self._current_step += 1
+        if self._progress_update_manager:
+            self._progress_update_manager.progress_update(
+                self._current_step, self._total_steps, message)
+
+    @abc.abstractmethod
+    def validate_connection_info(self, connection_info):
+        pass
 
 
 class BaseImportProvider(Baseprovider):
-    pass
+    __metaclass__ = abc.ABCMeta
+
+    @abc.abstractmethod
+    def import_instance(self, connection_info, target_environment,
+                        instance_name, export_info):
+        pass
 
 
 class BaseExportProvider(Baseprovider):
-    pass
+    __metaclass__ = abc.ABCMeta
+
+    @abc.abstractmethod
+    def export_instance(self, connection_info, instance_name, export_path):
+        pass
+
+
+class BaseProgressUpdateManager(object):
+    __metaclass__ = abc.ABCMeta
+
+    @abc.abstractmethod
+    def progress_update(self, current_step, total_steps, message):
+        pass

+ 33 - 2
coriolis/providers/openstack/__init__.py

@@ -212,10 +212,15 @@ class ImportProvider(base.BaseExportProvider):
         try:
             migr_keypair_name = self._get_unique_name()
 
+            self._progress_update("Creating migration worker instance keypair")
+
             k = paramiko.RSAKey.generate(2048)
             public_key = "ssh-rsa %s tmp@migration" % k.get_base64()
             keypair = self._create_keypair(nova, migr_keypair_name, public_key)
 
+            self._progress_update(
+                "Creating migration worker instance Neutron port")
+
             port = self._create_neutron_port(neutron, migr_network_name)
             userdata = MIGR_USER_DATA % (MIGR_GUEST_USERNAME, public_key)
             instance = nova.servers.create(
@@ -226,12 +231,18 @@ class ImportProvider(base.BaseExportProvider):
                 userdata=userdata,
                 nics=[{'port-id': port['id']}])
 
+            self._progress_update(
+                "Adding migration worker instance floating IP")
+
             floating_ip = nova.floating_ips.create(pool=migr_fip_pool_name)
             self._wait_for_instance(nova, instance, 'ACTIVE')
 
             LOG.info("Floating IP: %s", floating_ip.ip)
             instance.add_floating_ip(floating_ip)
 
+            self._progress_update(
+                "Adding migration worker instance security group")
+
             migr_sec_group_name = self._get_unique_name()
             sec_group = nova.security_groups.create(
                 name=migr_sec_group_name, description=migr_sec_group_name)
@@ -242,8 +253,10 @@ class ImportProvider(base.BaseExportProvider):
                 to_port=SSH_PORT)
             instance.add_security_group(sec_group.id)
 
-            LOG.info("Waiting for connectivity on host: %(ip)s:%(port)s",
-                     {"ip": floating_ip.ip, "port": SSH_PORT})
+            self._progress_update(
+                "Waiting for connectivity on host: %(ip)s:%(port)s" %
+                {"ip": floating_ip.ip, "port": SSH_PORT})
+
             utils.wait_for_port_connectivity(floating_ip.ip, SSH_PORT)
 
             return _MigrationResources(nova, neutron, keypair, instance, port,
@@ -315,6 +328,8 @@ class ImportProvider(base.BaseExportProvider):
                 #    utils.convert_disk_format(disk_path, target_disk_path,
                 #                              target_disk_format)
 
+                self._progress_update("Uploading Glance image")
+
                 disk_format = disk_file_info["format"]
                 image = self._create_image(
                     glance, self._get_unique_name(),
@@ -326,6 +341,8 @@ class ImportProvider(base.BaseExportProvider):
                 if disk_format != constants.DISK_FORMAT_RAW:
                     virtual_disk_size += DISK_HEADER_SIZE
 
+                self._progress_update("Creating Cinder volume")
+
                 volume_size_gb = math.ceil(virtual_disk_size / units.Gi)
                 volume = nova.volumes.create(
                     size=volume_size_gb,
@@ -342,8 +359,13 @@ class ImportProvider(base.BaseExportProvider):
         try:
             for i, volume in enumerate(volumes):
                 self._wait_for_volume(nova, volume, 'available')
+
+                self._progress_update("Deleting Glance image")
+
                 glance.images.delete(images[i].id)
 
+                self._progress_update("Attaching volume to worker instance")
+
                 # TODO: improve device assignment
                 volume_dev = "/dev/sd%s" % chr(ord('a') + i + 1)
                 volume_devs.append(volume_dev)
@@ -352,19 +374,28 @@ class ImportProvider(base.BaseExportProvider):
 
                 guest_conn_info = migr_resources.get_guest_connection_info()
 
+            self._progress_update("Preparing instance for target platform")
             osmorphing_manager.morph_image(guest_conn_info,
                                            hypervisor_type,
                                            constants.PLATFORM_OPENSTACK,
                                            volume_devs,
                                            nics_info)
         finally:
+            self._progress_update("Removing worker instance resources")
+
             migr_resources.delete()
 
         ports = []
         for nic_info in nics_info:
+            self._progress_update(
+                "Creating Neutron port for migrated instance")
+
             ports.append(self._create_neutron_port(
                 neutron, network_name, nic_info.get("mac_address")))
 
+        self._progress_update(
+            "Creating migrated instance")
+
         instance = self._create_target_instance(
             nova, flavor_name, instance_name, keypair_name, ports, volumes,
             migr_image_name)

+ 6 - 0
coriolis/providers/vmware_vsphere/__init__.py

@@ -257,13 +257,19 @@ class ExportProvider(base.BaseExportProvider):
         if allow_untrusted:
             context.verify_mode = ssl.CERT_NONE
 
+        self._set_total_progress_steps(4)
+
+        self._progress_update("Connecting to vSphere host")
         si = self._connect(host, username, password, port, context)
         try:
+            self._progress_update("Getting VM info")
             vm_info, vm = self._get_vm_info(si, instance_name)
+            self._progress_update("Exporting disks")
             disk_paths = self._export_disks(vm, export_path, context)
         finally:
             connect.Disconnect(si)
 
+        self._progress_update("Converting virtual disks format")
         for disk_path in disk_paths:
             path = disk_path["path"]
             LOG.info("Converting VMDK type: %s" % path)

+ 56 - 19
coriolis/worker/rpc/server.py

@@ -9,6 +9,7 @@ import psutil
 from coriolis.conductor.rpc import client as rpc_conductor_client
 from coriolis import constants
 from coriolis import exception
+from coriolis.providers import base
 from coriolis.providers import factory
 from coriolis import utils
 
@@ -23,9 +24,23 @@ CONF.register_opts(worker_opts, 'worker')
 
 LOG = logging.getLogger(__name__)
 
+TMP_DIRS_KEY = "__tmp_dirs"
+
 VERSION = "1.0"
 
 
+class _ConductorProgressUpdateManager(base.BaseProgressUpdateManager):
+    def __init__(self, ctxt, task_id):
+        self._ctxt = ctxt
+        self._task_id = task_id
+        self._rpc_conductor_client = rpc_conductor_client.ConductorClient()
+
+    def progress_update(self, current_step, total_steps, message):
+        LOG.info("Progress update: %s", message)
+        self._rpc_conductor_client.task_progress_update(
+            self._ctxt, self._task_id, current_step, total_steps, message)
+
+
 class WorkerServerEndpoint(object):
     def __init__(self):
         self._server = utils.get_hostname()
@@ -34,15 +49,28 @@ class WorkerServerEndpoint(object):
     def _get_task_export_path(self, task_id):
         return os.path.join(CONF.worker.export_base_path, task_id)
 
-    def _cleanup_task_resources(self, task_id):
+    def _cleanup_task_resources(self, task_id, task_info=None):
         try:
             export_path = self._get_task_export_path(task_id)
-            if os.path.exists(export_path):
-                shutil.rmtree(export_path)
+            if (not task_info or export_path not in
+                    task_info.get(TMP_DIRS_KEY, [])):
+                # Don't remove folder if it's needed by the dependent tasks
+                if os.path.exists(export_path):
+                    shutil.rmtree(export_path)
         except Exception as ex:
             # Ignore the exception
             LOG.exception(ex)
 
+    def _remove_tmp_dirs(self, task_info):
+        if task_info:
+            for tmp_dir in task_info.get(TMP_DIRS_KEY, []):
+                if os.path.exists(tmp_dir):
+                    try:
+                        shutil.rmtree(tmp_dir)
+                    except Exception as ex:
+                        # Ignore exception
+                        LOG.exception(ex)
+
     def stop_task(self, ctxt, process_id):
         try:
             p = psutil.Process(process_id)
@@ -53,7 +81,7 @@ class WorkerServerEndpoint(object):
     def _exec_task_process(self, ctxt, task_id, target, args):
         mp_ctx = multiprocessing.get_context('spawn')
         mp_q = mp_ctx.Queue()
-        p = mp_ctx.Process(target=target, args=(args + (mp_q,)))
+        p = mp_ctx.Process(target=target, args=(args + (ctxt, task_id, mp_q,)))
 
         p.start()
         LOG.info("Task process started: %s", task_id)
@@ -73,7 +101,7 @@ class WorkerServerEndpoint(object):
     def exec_task(self, ctxt, task_id, task_type, origin, destination,
                   instance, task_info):
         try:
-            task_info = None
+            new_task_info = None
 
             if task_type == constants.TASK_TYPE_EXPORT_INSTANCE:
                 provider = factory.get_provider(
@@ -82,11 +110,13 @@ class WorkerServerEndpoint(object):
                 if not os.path.exists(export_path):
                     os.makedirs(export_path)
 
-                task_info = self._exec_task_process(
+                new_task_info = self._exec_task_process(
                     ctxt, task_id, _export_instance,
                     (provider, origin["connection_info"],
                      instance, export_path))
 
+                new_task_info[TMP_DIRS_KEY] = [export_path]
+
             elif task_type == constants.TASK_TYPE_IMPORT_INSTANCE:
                 provider = factory.get_provider(
                     destination["type"], constants.PROVIDER_TYPE_IMPORT)
@@ -101,41 +131,48 @@ class WorkerServerEndpoint(object):
                                                   task_type)
 
             LOG.info("Task completed: %s", task_id)
-            LOG.info("Task info: %s", task_info)
-            self._rpc_conductor_client.task_completed(ctxt, task_id, task_info)
+            LOG.info("Task info: %s", new_task_info)
+            self._rpc_conductor_client.task_completed(ctxt, task_id,
+                                                      new_task_info)
 
-            # Resources are needed by dependent import tasks
-            if task_type != constants.TASK_TYPE_EXPORT_INSTANCE:
-                self._cleanup_task_resources(task_id)
+            self._cleanup_task_resources(task_id, new_task_info)
         except Exception as ex:
             LOG.exception(ex)
             self._rpc_conductor_client.set_task_error(ctxt, task_id, str(ex))
 
             self._cleanup_task_resources(task_id)
+        finally:
+            self._remove_tmp_dirs(task_info)
 
 
-def _export_instance(export_provider, connection_info,
-                     instance, export_path, mp_q):
+def _export_instance(provider, connection_info, instance, export_path,
+                     ctxt, task_id, mp_q):
     try:
         # Setting up logging, needed since this is a new process
         utils.setup_logging()
 
-        vm_info = export_provider.export_instance(
-            connection_info, instance, export_path)
+        progress_update_manager = _ConductorProgressUpdateManager(ctxt,
+                                                                  task_id)
+        provider.set_progress_update_manager(progress_update_manager)
+        vm_info = provider.export_instance(connection_info, instance,
+                                           export_path)
         mp_q.put(vm_info)
     except Exception as ex:
         mp_q.put(str(ex))
         LOG.exception(ex)
 
 
-def _import_instance(import_provider, connection_info,
-                     target_environment, instance, export_info, mp_q):
+def _import_instance(provider, connection_info, target_environment, instance,
+                     export_info, ctxt, task_id, mp_q):
     try:
         # Setting up logging, needed since this is a new process
         utils.setup_logging()
 
-        import_provider.import_instance(
-            connection_info, target_environment, instance, export_info)
+        progress_update_manager = _ConductorProgressUpdateManager(ctxt,
+                                                                  task_id)
+        provider.set_progress_update_manager(progress_update_manager)
+        provider.import_instance(connection_info, target_environment,
+                                 instance, export_info)
         mp_q.put(None)
     except Exception as ex:
         mp_q.put(str(ex))