Kaynağa Gözat

Adds task events

Alessandro Pilotti 10 yıl önce
ebeveyn
işleme
a116882fab

+ 2 - 2
coriolis/api/v1/migrations.py

@@ -24,12 +24,12 @@ class MigrationController(api_wsgi.Controller):
     def index(self, req):
         return migration_view.collection(
             req, self._migration_api.get_migrations(
-                req.environ['coriolis.context']))
+                req.environ['coriolis.context'], include_tasks=False))
 
     def detail(self, req):
         return migration_view.collection(
             req, self._migration_api.get_migrations(
-                req.environ['coriolis.context']))
+                req.environ['coriolis.context'], include_tasks=True))
 
     def _validate_create_body(self, body):
         migration = body["migration"]

+ 7 - 2
coriolis/conductor/rpc/client.py

@@ -10,8 +10,9 @@ class ConductorClient(object):
         target = messaging.Target(topic='coriolis_conductor', version=VERSION)
         self._client = rpc.get_client(target)
 
-    def get_migrations(self, ctxt):
-        return self._client.call(ctxt, 'get_migrations')
+    def get_migrations(self, ctxt, include_tasks=False):
+        return self._client.call(ctxt, 'get_migrations',
+                                 include_tasks=include_tasks)
 
     def get_migration(self, ctxt, migration_id):
         return self._client.call(
@@ -43,6 +44,10 @@ class ConductorClient(object):
         self._client.call(ctxt, 'set_task_error', task_id=task_id,
                           exception_details=exception_details)
 
+    def task_event(self, ctxt, task_id, level, message):
+        self._client.cast(ctxt, 'task_event', task_id=task_id, level=level,
+                          message=message)
+
     def task_progress_update(self, ctxt, task_id, current_step, total_steps,
                              message):
         self._client.cast(ctxt, 'task_progress_update', task_id=task_id,

+ 6 - 2
coriolis/conductor/rpc/server.py

@@ -17,8 +17,8 @@ class ConductorServerEndpoint(object):
     def __init__(self):
         self._rpc_worker_client = rpc_worker_client.WorkerClient()
 
-    def get_migrations(self, ctxt):
-        return db_api.get_migrations(ctxt)
+    def get_migrations(self, ctxt, include_tasks):
+        return db_api.get_migrations(ctxt, include_tasks)
 
     def get_migration(self, ctxt, migration_id):
         return db_api.get_migration(ctxt, migration_id)
@@ -159,6 +159,10 @@ class ConductorServerEndpoint(object):
         db_api.set_migration_status(
             ctxt, migration.id, constants.MIGRATION_STATUS_ERROR)
 
+    def task_event(self, ctxt, task_id, level, message):
+        LOG.info("Task event: %s", task_id)
+        db_api.add_task_event(ctxt, task_id, level, message)
+
     def task_progress_update(self, ctxt, task_id, current_step, total_steps,
                              message):
         LOG.info("Task progress update: %s", task_id)

+ 4 - 0
coriolis/constants.py

@@ -27,3 +27,7 @@ HYPERVISOR_XENSERVER = "xenserver"
 
 PLATFORM_OPENSTACK = "openstack"
 PLATFORM_VMWARE_VSPHERE = "vmware_vsphere"
+
+TASK_EVENT_INFO = "INFO"
+TASK_EVENT_WARNING = "WARNING"
+TASK_EVENT_ERROR = "ERROR"

+ 23 - 7
coriolis/db/api.py

@@ -55,17 +55,24 @@ def _soft_delete_aware_query(context, *args, **kwargs):
 
 
 @enginefacade.reader
-def get_migrations(context):
-    return _soft_delete_aware_query(context, models.Migration).options(
-        orm.joinedload("tasks")).filter_by(
-        project_id=context.tenant).all()
+def get_migrations(context, include_tasks=False):
+    q = _soft_delete_aware_query(context, models.Migration)
+    if include_tasks:
+        q = _get_migration_task_query_options(q)
+    return q.filter_by(project_id=context.tenant).all()
+
+
+def _get_migration_task_query_options(query):
+    return query.options(
+        orm.joinedload("tasks").joinedload("progress_updates")).options(
+            orm.joinedload("tasks").joinedload("events"))
 
 
 @enginefacade.reader
 def get_migration(context, migration_id):
-    return _soft_delete_aware_query(context, models.Migration).options(
-        orm.joinedload("tasks").joinedload("progress_updates")).filter_by(
-        project_id=context.tenant, id=migration_id).first()
+    q = _soft_delete_aware_query(context, models.Migration)
+    q = _get_migration_task_query_options(q)
+    return q.filter_by(project_id=context.tenant, id=migration_id).first()
 
 
 @enginefacade.writer
@@ -125,6 +132,15 @@ def get_task(context, task_id, include_migration_tasks=False):
         join_options).filter_by(id=task_id).first()
 
 
+@enginefacade.writer
+def add_task_event(context, task_id, level, message):
+    task_event = models.TaskEvent()
+    task_event.task_id = task_id
+    task_event.level = level
+    task_event.message = message
+    context.session.add(task_event)
+
+
 @enginefacade.writer
 def add_task_progress_update(context, task_id, current_step, total_steps,
                              message):

+ 18 - 0
coriolis/db/sqlalchemy/migrate_repo/versions/001_initial.py

@@ -67,10 +67,28 @@ def upgrade(migrate_engine):
         mysql_charset='utf8'
     )
 
+    task_events = sqlalchemy.Table(
+        'task_event', meta,
+        sqlalchemy.Column('id', sqlalchemy.String(36), primary_key=True,
+                          default=lambda: str(uuid.uuid4())),
+        sqlalchemy.Column('created_at', sqlalchemy.DateTime),
+        sqlalchemy.Column('updated_at', sqlalchemy.DateTime),
+        sqlalchemy.Column('deleted_at', sqlalchemy.DateTime),
+        sqlalchemy.Column('deleted', sqlalchemy.String(36)),
+        sqlalchemy.Column("task_id", sqlalchemy.String(36),
+                          sqlalchemy.ForeignKey('task.id'),
+                          nullable=False),
+        sqlalchemy.Column("level", sqlalchemy.String(50), nullable=False),
+        sqlalchemy.Column("message", sqlalchemy.String(1024), nullable=False),
+        mysql_engine='InnoDB',
+        mysql_charset='utf8'
+    )
+
     tables = (
         migration,
         task,
         task_progress_update,
+        task_events,
     )
 
     for index, table in enumerate(tables):

+ 16 - 0
coriolis/db/sqlalchemy/models.py

@@ -10,6 +10,20 @@ from coriolis.db.sqlalchemy import types
 BASE = declarative.declarative_base()
 
 
+class TaskEvent(BASE, models.TimestampMixin, models.SoftDeleteMixin,
+                models.ModelBase):
+    __tablename__ = 'task_event'
+
+    id = sqlalchemy.Column(sqlalchemy.String(36),
+                           default=lambda: str(uuid.uuid4()),
+                           primary_key=True)
+    task_id = sqlalchemy.Column(sqlalchemy.String(36),
+                                sqlalchemy.ForeignKey('task.id'),
+                                nullable=False)
+    level = sqlalchemy.Column(sqlalchemy.String(20), nullable=False)
+    message = sqlalchemy.Column(sqlalchemy.String(1024), nullable=False)
+
+
 class TaskProgressUpdate(BASE, models.TimestampMixin, models.SoftDeleteMixin,
                          models.ModelBase):
     __tablename__ = 'task_progress_update'
@@ -42,6 +56,8 @@ class Task(BASE, models.TimestampMixin, models.SoftDeleteMixin,
     task_type = sqlalchemy.Column(sqlalchemy.String(100), nullable=False)
     exception_details = sqlalchemy.Column(sqlalchemy.Text, nullable=True)
     depends_on = sqlalchemy.Column(types.List, nullable=True)
+    events = orm.relationship(TaskEvent, cascade="all,delete",
+                              backref=orm.backref('task'))
     progress_updates = orm.relationship(TaskProgressUpdate,
                                         cascade="all,delete",
                                         backref=orm.backref('task'))

+ 2 - 2
coriolis/migrations/api.py

@@ -15,8 +15,8 @@ class API(object):
     def cancel(self, ctxt, migration_id):
         self._rpc_client.cancel_migration(ctxt, migration_id)
 
-    def get_migrations(self, ctxt):
-        return self._rpc_client.get_migrations(ctxt)
+    def get_migrations(self, ctxt, include_tasks=False):
+        return self._rpc_client.get_migrations(ctxt, include_tasks)
 
     def get_migration(self, ctxt, migration_id):
         return self._rpc_client.get_migration(ctxt, migration_id)

+ 29 - 5
coriolis/providers/base.py

@@ -9,18 +9,30 @@ class Baseprovider(object):
         self._current_step = 0
         self._total_steps = None
 
-    def set_progress_update_manager(self, progress_update_manager):
-        self._progress_update_manager = progress_update_manager
+    def set_event_handler(self, event_handler):
+        self._event_handler = event_handler
 
     def _set_total_progress_steps(self, total_steps):
         self._total_steps = total_steps
 
     def _progress_update(self, message):
         self._current_step += 1
-        if self._progress_update_manager:
-            self._progress_update_manager.progress_update(
+        if self._event_handler:
+            self._event_handler.progress_update(
                 self._current_step, self._total_steps, message)
 
+    def _info(self, message):
+        if self._event_handler:
+            self._event_handler.info(message)
+
+    def _warn(self, message):
+        if self._event_handler:
+            self._event_handler.warn(message)
+
+    def _error(self, message):
+        if self._event_handler:
+            self._event_handler.error(message)
+
     @abc.abstractmethod
     def validate_connection_info(self, connection_info):
         pass
@@ -44,9 +56,21 @@ class BaseExportProvider(Baseprovider):
         pass
 
 
-class BaseProgressUpdateManager(object):
+class BaseProviderEventHandler(object):
     __metaclass__ = abc.ABCMeta
 
     @abc.abstractmethod
     def progress_update(self, current_step, total_steps, message):
         pass
+
+    @abc.abstractmethod
+    def info(self, message):
+        pass
+
+    @abc.abstractmethod
+    def warn(self, message):
+        pass
+
+    @abc.abstractmethod
+    def error(self, message):
+        pass

+ 2 - 2
coriolis/providers/openstack/__init__.py

@@ -423,8 +423,8 @@ class ImportProvider(base.BaseExportProvider):
 
             origin_network_name = nic_info.get("network_name")
             if not origin_network_name:
-                LOG.warn("Origin network name not provided for for nic: %s, "
-                         "skipping", nic_info.get("name"))
+                self._warn("Origin network name not provided for for nic: %s, "
+                           "skipping", nic_info.get("name"))
                 continue
 
             network_name = network_map.get(origin_network_name)

+ 18 - 4
coriolis/worker/rpc/server.py

@@ -30,7 +30,7 @@ TMP_DIRS_KEY = "__tmp_dirs"
 VERSION = "1.0"
 
 
-class _ConductorProgressUpdateManager(base.BaseProgressUpdateManager):
+class _ConductorProviderEventHandler(base.BaseProviderEventHandler):
     def __init__(self, ctxt, task_id):
         self._ctxt = ctxt
         self._task_id = task_id
@@ -41,6 +41,21 @@ class _ConductorProgressUpdateManager(base.BaseProgressUpdateManager):
         self._rpc_conductor_client.task_progress_update(
             self._ctxt, self._task_id, current_step, total_steps, message)
 
+    def info(self, message):
+        LOG.info(message)
+        self._rpc_conductor_client.task_event(
+            self._ctxt, self._task_id, constants.TASK_EVENT_INFO, message)
+
+    def warn(self, message):
+        LOG.warn(message)
+        self._rpc_conductor_client.task_event(
+            self._ctxt, self._task_id, constants.TASK_EVENT_WARNING, message)
+
+    def error(self, message):
+        LOG.error(message)
+        self._rpc_conductor_client.task_event(
+            self._ctxt, self._task_id, constants.TASK_EVENT_ERROR, message)
+
 
 class WorkerServerEndpoint(object):
     def __init__(self):
@@ -153,9 +168,8 @@ def _task_process(ctxt, task_id, task_type, origin, destination, instance,
                 "Unknown task type: %s" % task_type)
 
         provider = factory.get_provider(data["type"], provider_type)
-        progress_update_manager = _ConductorProgressUpdateManager(ctxt,
-                                                                  task_id)
-        provider.set_progress_update_manager(progress_update_manager)
+        event_handler = _ConductorProviderEventHandler(ctxt, task_id)
+        provider.set_event_handler(event_handler)
 
         connection_info = data.get("connection_info", {})
         target_environment = data.get("target_environment", {})