Explorar o código

Various worker improvements

Alessandro Pilotti %!s(int64=10) %!d(string=hai) anos
pai
achega
0e7a3263cc

+ 1 - 3
coriolis/api/v1/migrations.py

@@ -1,5 +1,3 @@
-from oslo_service import wsgi
-
 from coriolis.api import wsgi as api_wsgi
 from coriolis.api.v1.views import migration_view
 from coriolis import constants
@@ -51,7 +49,7 @@ class MigrationController(object):
         self._migration_api.start(origin, destination, instances)
 
     def delete(self, req, id):
-        print("Delete: %s" % id)
+        self._migration_api.stop(id)
 
 
 def create_resource():

+ 0 - 6
coriolis/api/v1/router.py

@@ -1,12 +1,6 @@
 from oslo_log import log as logging
-from oslo_service import wsgi
-import routes
-
-import webob.dec
-import webob.exc
 
 from coriolis import api
-from coriolis.i18n import _LI, _LE
 from coriolis.api.v1 import migrations
 
 LOG = logging.getLogger(__name__)

+ 11 - 2
coriolis/conductor/rpc/client.py

@@ -26,9 +26,14 @@ class ConductorClient(object):
             ctxt, 'migrate_instances', origin=origin, destination=destination,
             instances=instances)
 
-    def set_task_host(self, ctxt, task_id, host):
+    def stop_instances_migration(self, ctxt, migration_id):
         self._client.call(
-            ctxt, 'set_task_host', task_id=task_id, host=host)
+            ctxt, 'stop_instances_migration', migration_id=migration_id)
+
+    def set_task_host(self, ctxt, task_id, host, process_id):
+        self._client.call(
+            ctxt, 'set_task_host', task_id=task_id, host=host,
+            process_id=process_id)
 
     def export_completed(self, ctxt, task_id, export_info):
         self._client.call(
@@ -37,3 +42,7 @@ class ConductorClient(object):
 
     def import_completed(self, ctxt, task_id):
         self._client.call(ctxt, 'import_completed', task_id=task_id)
+
+    def set_task_error(self, ctxt, task_id, exception_details):
+        self._client.call(ctxt, 'set_task_error', task_id=task_id,
+                          exception_details=exception_details)

+ 35 - 10
coriolis/conductor/rpc/server.py

@@ -2,6 +2,7 @@ import uuid
 
 import json
 
+from oslo_log import log as logging
 import oslo_messaging as messaging
 
 from coriolis import constants
@@ -11,6 +12,8 @@ from coriolis.worker.rpc import client as rpc_worker_client
 
 VERSION = "1.0"
 
+LOG = logging.getLogger(__name__)
+
 
 class ConductorServerEndpoint(object):
     def __init__(self):
@@ -42,24 +45,36 @@ class ConductorServerEndpoint(object):
         migration.destination = json.dumps(destination)
 
         for instance in instances:
-            op = models.Task()
-            op.id = str(uuid.uuid4())
-            op.migration = migration
-            op.instance = instance
-            op.status = constants.TASK_STATUS_STARTED
-            op.task_type = constants.TASK_TYPE_EXPORT
+            task = models.Task()
+            task.id = str(uuid.uuid4())
+            task.migration = migration
+            task.instance = instance
+            task.status = constants.TASK_STATUS_STARTED
+            task.task_type = constants.TASK_TYPE_EXPORT
 
         db_api.add(ctxt, migration)
+        LOG.info("Migration created: %s", migration.id)
 
-        for op in migration.tasks:
+        for task in migration.tasks:
             self._rpc_worker_client.begin_export_instance(
-                ctxt.to_dict(), op.id, origin, instance)
+                ctxt.to_dict(), task.id, origin, instance)
+
+    def stop_instances_migration(self, ctxt, migration_id):
+        # TODO: fix context
+        from coriolis import context
+        ctxt = context.CoriolisContext()
+
+        migration = db_api.get_migration(ctxt, migration_id)
+        for task in migration.tasks:
+            if task.status == constants.TASK_STATUS_STARTED:
+                self._rpc_worker_client.stop_task(
+                    ctxt.to_dict(), task.host, task.process_id)
 
-    def set_task_host(self, ctxt, task_id, host):
+    def set_task_host(self, ctxt, task_id, host, process_id):
         # TODO: fix context
         from coriolis import context
         ctxt = context.CoriolisContext()
-        db_api.set_task_host(ctxt, task_id, host)
+        db_api.set_task_host(ctxt, task_id, host, process_id)
 
     def export_completed(self, ctxt, task_id, export_info):
         # TODO: fix context
@@ -92,3 +107,13 @@ class ConductorServerEndpoint(object):
 
         db_api.update_task_status(
             ctxt, task_id, constants.TASK_STATUS_COMPLETE)
+
+    def set_task_error(self, ctxt, task_id, exception_details):
+        # TODO: fix context
+        from coriolis import context
+        ctxt = context.CoriolisContext()
+
+        db_api.update_task_status(
+            ctxt, task_id, constants.TASK_STATUS_ERROR,
+            exception_details)
+        # TODO: set migration in error state and canel other tasks

+ 6 - 0
coriolis/constants.py

@@ -11,3 +11,9 @@ TASK_TYPE_IMPORT = "IMPORT"
 
 PROVIDER_TYPE_IMPORT = 1
 PROVIDER_TYPE_EXPORT = 2
+
+DISK_FORMAT_VMDK = 'vmdk'
+DISK_FORMAT_RAW = 'raw'
+DISK_FORMAT_QCOW2 = 'qcow2'
+DISK_FORMAT_VHD = 'vhd'
+DISK_FORMAT_VHDX = 'vhdx'

+ 8 - 6
coriolis/db/api.py

@@ -52,17 +52,19 @@ def add(context, migration):
 
 
 @enginefacade.writer
-def update_task_status(context, task_id, status):
-    op = context.session.query(models.Task).filter_by(
+def update_task_status(context, task_id, status, exception_details=None):
+    task = context.session.query(models.Task).filter_by(
         id=task_id).first()
-    op.status = status
+    task.status = status
+    task.exception_details = exception_details
 
 
 @enginefacade.writer
-def set_task_host(context, task_id, host):
-    op = context.session.query(models.Task).filter_by(
+def set_task_host(context, task_id, host, process_id):
+    task = context.session.query(models.Task).filter_by(
         id=task_id).first()
-    op.host = host
+    task.host = host
+    task.process_id = process_id
 
 
 @enginefacade.reader

+ 1 - 0
coriolis/db/sqlalchemy/api.py

@@ -5,6 +5,7 @@ from oslo_db import options as db_options
 from oslo_db.sqlalchemy import session as db_session
 
 from coriolis.db.sqlalchemy import migration
+from coriolis.i18n import _
 
 CONF = cfg.CONF
 db_options.set_defaults(CONF)

+ 2 - 0
coriolis/db/sqlalchemy/migrate_repo/versions/001_initial.py

@@ -33,9 +33,11 @@ def upgrade(migrate_engine):
                           nullable=False),
         sqlalchemy.Column("instance", sqlalchemy.String(1024), nullable=False),
         sqlalchemy.Column("host", sqlalchemy.String(1024), nullable=True),
+        sqlalchemy.Column("process_id", sqlalchemy.Integer, nullable=True),
         sqlalchemy.Column("status", sqlalchemy.String(100), nullable=False),
         sqlalchemy.Column("task_type", sqlalchemy.String(100),
                           nullable=False),
+        sqlalchemy.Column("exception_details", sqlalchemy.Text, nullable=True),
         mysql_engine='InnoDB',
         mysql_charset='utf8'
     )

+ 4 - 0
coriolis/db/sqlalchemy/models.py

@@ -1,3 +1,5 @@
+import uuid
+
 from oslo_db.sqlalchemy import models
 from sqlalchemy.ext import declarative
 from sqlalchemy.orm import relationship, backref
@@ -20,8 +22,10 @@ class Task(BASE, models.TimestampMixin, models.ModelBase):
     # backref=backref("tasks"), lazy='joined')
     instance = Column(String(1024), nullable=False)
     host = Column(String(1024), nullable=True)
+    process_id = Column(Integer, nullable=True)
     status = Column(String(100), nullable=False)
     task_type = Column(String(100), nullable=False)
+    exception_details = Column(Text, nullable=True)
 
 
 class Migration(BASE, models.TimestampMixin, models.ModelBase):

+ 4 - 0
coriolis/exception.py

@@ -229,3 +229,7 @@ ObjectFieldInvalid = obj_exc.ObjectFieldInvalid
 class NotSupportedOperation(Invalid):
     message = _("Operation not supported: %(operation)s.")
     code = 405
+
+
+class TaskProcessException(CoriolisException):
+    pass

+ 5 - 0
coriolis/migrations/api.py

@@ -11,6 +11,11 @@ class API(object):
         self._rpc_client.begin_migrate_instances(
             ctxt.to_dict(), origin, destination, instances)
 
+    def stop(self, migration_id):
+        ctxt = context.CoriolisContext()
+        self._rpc_client.stop_instances_migration(
+            ctxt.to_dict(), migration_id)
+
     def get_migrations(self):
         ctxt = context.CoriolisContext()
         return self._rpc_client.get_migrations(ctxt.to_dict())

+ 44 - 2
coriolis/providers/openstack/__init__.py

@@ -1,10 +1,52 @@
+from cinderclient import client as cinder_client
+from glanceclient import client as glance_client
+from keystoneauth1 import loading
+from keystoneauth1 import session
+from neutronclient.neutron import client as neutron_client
+from novaclient import client as nova_client
+
+
 from coriolis.providers import base
 
 
 class ImportProvider(base.BaseExportProvider):
     def validate_connection_info(self, connection_info):
-        return True
+        keys = ["auth_url", "username", "password", "project_name"]
+        if connection_info.get("identity_api_version", 2) >= 3:
+            keys.append("domain_name")
+        return all(k in connection_info for k in keys)
+
+    def _create_keystone_session(self, connection_info):
+        keystone_version = connection_info.get("identity_api_version", 2)
+        auth_url = connection_info["auth_url"]
+        username = connection_info["username"]
+        password = connection_info["password"]
+        project_name = connection_info["project_name"]
+        domain_name = connection_info.get("domain_name")
+        allow_untrusted = connection_info.get("allow_untrusted", False)
+
+        # TODO: add "ca_cert" to connection_info
+        verify = not allow_untrusted
+
+        if keystone_version == 3:
+            loader = loading.get_plugin_loader('v3password')
+            auth = loader.load_from_options(
+                auth_url=auth_url,
+                username=username,
+                password=password,
+                user_domain_name=domain_name,
+                project_domain_name=domain_name,
+                project_name=project_name)
+        else:
+            loader = loading.get_plugin_loader('password')
+            auth = loader.load_from_options(
+                auth_url=auth_url,
+                username=username,
+                password=password,
+                project_name=project_name)
+
+        return session.Session(auth=auth, verify=verify)
 
     def import_instance(self, connection_info, target_environment,
                         instance_name, export_info):
-        pass
+        session = self._create_keystone_session(connection_info)

+ 5 - 3
coriolis/providers/vmware_vsphere/__init__.py

@@ -9,6 +9,7 @@ from oslo_log import log as logging
 from pyVim import connect
 from pyVmomi import vim
 
+from coriolis import constants
 from coriolis.providers import base
 from coriolis import utils
 
@@ -96,7 +97,7 @@ class ExportProvider(base.BaseExportProvider):
                 vm.ShutdownGuest()
             else:
                 task = vm.PowerOff()
-                _wait_for_task(task)
+                self._wait_for_task(task)
 
         disk_ctrls = []
         devices = [d for d in vm.config.hardware.device if
@@ -234,8 +235,9 @@ class ExportProvider(base.BaseExportProvider):
             self._convert_disk_type(path, tmp_path)
             os.remove(path)
             os.rename(tmp_path, path)
-            [d for d in disks if
-             d["id"] == disk_path["id"]][0]["path"] = os.path.abspath(path)
+            disk_info = [d for d in disks if d["id"] == disk_path["id"]][0]
+            disk_info["path"] = os.path.abspath(path)
+            disk_info["format"] = constants.DISK_FORMAT_VMDK
 
         vm_info["devices"] = {
             "nics": nics,

+ 5 - 0
coriolis/utils.py

@@ -1,5 +1,6 @@
 import socket
 import subprocess
+import traceback
 
 
 def exec_process(args):
@@ -14,3 +15,7 @@ def exec_process(args):
 
 def get_hostname():
     return socket.gethostname()
+
+
+def get_exception_details():
+    return traceback.format_exc()

+ 6 - 1
coriolis/worker/rpc/client.py

@@ -19,6 +19,11 @@ class WorkerClient(object):
             ctxt, 'export_instance', task_id=task_id, origin=origin,
             instance=instance)
 
+    def stop_task(self, ctxt, server, process_id):
+        # Needs to be executed on the same server
+        cctxt = self._client.prepare(server=server)
+        cctxt.call(ctxt, 'stop_task', process_id=process_id)
+
     def begin_import_instance(self, ctxt, server, task_id, destination,
                               instance, export_info):
         # Needs to be executed on the same server
@@ -28,5 +33,5 @@ class WorkerClient(object):
             destination=destination, instance=instance,
             export_info=export_info)
 
-    def update_migration_status(self, ctx, task_id, status):
+    def update_migration_status(self, ctxt, task_id, status):
         self._client.call(ctxt, "update_migration_status", status=status)

+ 91 - 26
coriolis/worker/rpc/server.py

@@ -1,11 +1,15 @@
 import os
+import multiprocessing
+import queue
+import shutil
 
 from oslo_config import cfg
 from oslo_log import log as logging
-import oslo_messaging as messaging
+import psutil
 
 from coriolis.conductor.rpc import client as rpc_conductor_client
 from coriolis import constants
+from coriolis import exception
 from coriolis.providers import factory
 from coriolis import utils
 
@@ -30,52 +34,113 @@ class WorkerServerEndpoint(object):
         self._rpc_conductor_client = rpc_conductor_client.ConductorClient()
 
     def _get_task_export_path(self, task_id):
-        path = os.path.join(CONF.worker.export_base_path, task_id)
-        if not os.path.exists(path):
-            os.makedirs(path)
-        return path
+        return os.path.join(CONF.worker.export_base_path, task_id)
 
-    def _cleanup_task_export_path(self, export_path):
-        if os.path.exists(export_path):
-            shtutil.rmtree(export_path)
+    def _cleanup_task_resources(self, task_id):
+        try:
+            export_path = self._get_task_export_path(task_id)
+            if os.path.exists(export_path):
+                shutil.rmtree(export_path)
+        except Exception as ex:
+            # Swallow the exception
+            LOG.exception(ex)
 
-    def export_instance(self, ctxt, task_id, origin, instance):
+    def stop_task(self, ctxt, process_id):
+        try:
+            p = psutil.Process(process_id)
+            p.kill()
+        except psutil.NoSuchProcess:
+            LOG.info("Task process not found: %s", process_id)
+
+    def _exec_task_process(self, ctxt, task_id, target, args):
+        mp_q = multiprocessing.Queue()
+        p = multiprocessing.Process(target=target, args=(args + (mp_q,)))
+
+        p.start()
+        LOG.info("Task process started: %s", task_id)
         self._rpc_conductor_client.set_task_host(
-            ctxt, task_id, self._server)
+            ctxt, task_id, self._server, p.pid)
+
+        p.join()
+
+        try:
+            result = mp_q.get(False)
+        except queue.Empty:
+            raise Exception("Task process terminated")
+
+        if isinstance(result, str):
+            raise exception.TaskProcessException(result)
+        return result
+
+    def export_instance(self, ctxt, task_id, origin, instance):
+        def _export_instance(export_provider, connection_info,
+                             instance, export_path, mp_q):
+            try:
+                vm_info = export_provider.export_instance(
+                    connection_info, instance, export_path)
+                mp_q.put(vm_info)
+            except Exception as ex:
+                mp_q.put(utils.get_exception_details())
+                LOG.exception(ex)
 
         try:
             export_provider = factory.get_provider(
                 origin["type"], constants.PROVIDER_TYPE_EXPORT)
             export_path = self._get_task_export_path(task_id)
-            vm_info = export_provider.export_instance(
-                origin["connection_info"], instance, export_path)
-            LOG.info("Exported VM: %s" % vm_info)
+            if not os.path.exists(export_path):
+                os.makedirs(export_path)
 
+            vm_info = self._exec_task_process(
+                ctxt, task_id, _export_instance,
+                (export_provider, origin["connection_info"],
+                 instance, export_path))
+
+            LOG.info("Exported VM: %s" % vm_info)
             self._rpc_conductor_client.export_completed(
                 ctxt, task_id, vm_info)
         except Exception as ex:
             LOG.exception(ex)
-            self._cleanup_task_export_path(export_path)
-            # TODO: set error state
-            # self._rpc_conductor_client.set_task_error(ctxt,
-            # task_id, ex)
+            if isinstance(ex, exception.TaskProcessException):
+                stack_trace = ex.message
+            else:
+                stack_trace = utils.get_exception_details()
+
+            self._cleanup_task_resources(task_id)
+            self._rpc_conductor_client.set_task_error(
+                ctxt, task_id, stack_trace)
 
     def import_instance(self, ctxt, task_id, destination, instance,
                         export_info):
-        self._rpc_conductor_client.set_task_host(
-            ctxt, task_id, self._server)
+        def _import_instance(import_provider, connection_info,
+                             target_environment, instance, export_info, mp_q):
+            try:
+                import_provider.import_instance(
+                    connection_info, target_environment, instance, export_info)
+                mp_q.put(None)
+            except Exception as ex:
+                mp_q.put(utils.get_exception_details())
+                LOG.exception(ex)
 
         try:
             import_provider = factory.get_provider(
                 destination["type"], constants.PROVIDER_TYPE_IMPORT)
-            import_provider.import_instance(
-                destination["connection_info"],
-                destination["target_environment"],
-                instance, export_info)
 
+            self._exec_task_process(
+                ctxt, task_id, _import_instance,
+                (import_provider, destination["connection_info"],
+                 destination["target_environment"],
+                 instance, export_info))
+
+            LOG.info("Import completed")
             self._rpc_conductor_client.import_completed(ctxt, task_id)
         except Exception as ex:
             LOG.exception(ex)
-            # TODO: set error state
-            # self._rpc_conductor_client.set_task_error(
-            # ctxt, task_id, ex)
+            if isinstance(ex, exception.TaskProcessException):
+                stack_trace = ex.message
+            else:
+                stack_trace = utils.get_exception_details()
+
+            self._rpc_conductor_client.set_task_error(
+                ctxt, task_id, stack_trace)
+        finally:
+            self._cleanup_task_resources(task_id)