Просмотр исходного кода

Streamline task/minion pool event/progress update handling.

The old implementation of conductor task and minion pool events/progress
updates required "client-side" accouting of event/progress update
indexes and ordering, leading to excessive RPC/DB calls.
Additionally, the computation of percentages was also handled
"client-side" and simply added as a static message to the progress
update.

The new update adds an 'index' field to the DB tables for
events/progress updates which auto-increments for each task/pool, and
switches to using the 'current_step' (previously functioned as 'index')
and 'total_step' fields (previously unused) to represent the
current/total progress which can later be used to derive percentages.
Nashwan Azhari 5 лет назад
Родитель
Сommit
6adbf197c6

+ 62 - 13
coriolis/conductor/rpc/client.py

@@ -2,12 +2,16 @@
 # All Rights Reserved.
 
 from oslo_config import cfg
+from oslo_log import log as logging
 import oslo_messaging as messaging
 
 from coriolis import constants
+from coriolis import events
 from coriolis import rpc
 
+
 VERSION = "1.0"
+LOG = logging.getLogger(__name__)
 
 conductor_opts = [
     cfg.IntOpt("conductor_rpc_timeout",
@@ -276,24 +280,29 @@ class ConductorClient(rpc.BaseRPCClient):
             ctxt, 'set_task_error', task_id=task_id,
             exception_details=exception_details)
 
-    def task_event(self, ctxt, task_id, level, message):
-        self._cast(
-            ctxt, 'task_event', task_id=task_id, level=level, message=message)
-
-    def add_task_progress_update(self, ctxt, task_id, total_steps, message):
+    def add_task_event(self, ctxt, task_id, level, message):
         self._cast(
+            ctxt, 'add_task_event', task_id=task_id, level=level, message=message)
+
+    def add_task_progress_update(
+            self, ctxt, task_id, message, initial_step=0, total_steps=0,
+            return_event=False):
+        operation = self._cast
+        if return_event:
+            operation = self._call
+        return operation(
             ctxt, 'add_task_progress_update', task_id=task_id,
-            total_steps=total_steps, message=message)
+            message=message, initial_step=initial_step,
+            total_steps=total_steps)
 
-    def update_task_progress_update(self, ctxt, task_id, step,
-                                    total_steps, message):
+    def update_task_progress_update(
+            self, ctxt, task_id, progress_update_index, new_current_step,
+            new_total_steps=None, new_message=None):
         self._cast(
             ctxt, 'update_task_progress_update', task_id=task_id,
-            step=step, total_steps=total_steps, message=message)
-
-    def get_task_progress_step(self, ctxt, task_id):
-        return self._call(
-            ctxt, 'get_task_progress_step', task_id=task_id)
+            progress_update_index=progress_update_index,
+            new_current_step=new_current_step, new_total_steps=new_total_steps,
+            new_message=new_message)
 
     def create_replica_schedule(self, ctxt, replica_id,
                                 schedule, enabled, exp_date,
@@ -428,3 +437,43 @@ class ConductorClient(rpc.BaseRPCClient):
             ctxt, 'report_migration_minions_allocation_error',
             migration_id=migration_id,
             minion_allocation_error_details=minion_allocation_error_details)
+
+
+class ConductorTaskRpcEventHandler(events.BaseEventHandler):
+    def __init__(self, ctxt, task_id):
+        self._ctxt = ctxt
+        self._task_id = task_id
+
+    @property
+    def _rpc_conductor_client(self):
+        # NOTE(aznashwan): it is unsafe to fork processes with pre-instantiated
+        # oslo_messaging clients as the underlying eventlet thread queues will
+        # be invalidated.
+        return ConductorClient()
+
+    @classmethod
+    def get_progress_update_identifier(self, progress_update):
+        return progress_update['index']
+
+    def add_progress_update(
+            self, message, initial_step=0, total_steps=0, return_event=False):
+        LOG.info(
+            "Sending progress update for task '%s' to conductor: %s",
+            self._task_id, message)
+        return self._rpc_conductor_client.add_task_progress_update(
+            self._ctxt, self._task_id, message, initial_step=initial_step,
+            total_steps=total_steps, return_event=return_event)
+
+    def update_progress_update(
+            self, update_identifier, new_current_step,
+            new_total_steps=None, new_message=None):
+        LOG.info(
+            "Updating progress update '%s' for task '%s' with new step %s",
+            update_identifier, self._task_id, new_current_step)
+        self._rpc_conductor_client.update_task_progress_update(
+            self._ctxt, self._task_id, update_identifier, new_current_step,
+            new_total_steps=new_total_steps, new_message=new_message)
+
+    def add_event(self, message, level=constants.TASK_EVENT_INFO):
+        self._rpc_conductor_client.add_task_event(
+            self._ctxt, self._task_id, level, message)

+ 15 - 13
coriolis/conductor/rpc/server.py

@@ -3345,15 +3345,14 @@ class ConductorServerEndpoint(object):
             else:
                 self._cancel_tasks_execution(ctxt, execution)
 
-
             # NOTE: if this was a migration, make sure to delete
             # its associated reservation.
             if execution.type == constants.EXECUTION_TYPE_MIGRATION:
                 self._check_delete_reservation_for_transfer(action)
 
     @task_synchronized
-    def task_event(self, ctxt, task_id, level, message):
-        LOG.info("Task event: %s", task_id)
+    def add_task_event(self, ctxt, task_id, level, message):
+        LOG.info("Adding event for task '%s': %s", task_id, message)
         task = db_api.get_task(ctxt, task_id)
         if task.status not in constants.ACTIVE_TASK_STATUSES:
             raise exception.InvalidTaskState(
@@ -3364,7 +3363,8 @@ class ConductorServerEndpoint(object):
         db_api.add_task_event(ctxt, task_id, level, message)
 
     @task_synchronized
-    def add_task_progress_update(self, ctxt, task_id, total_steps, message):
+    def add_task_progress_update(
+            self, ctxt, task_id, message, initial_step=0, total_steps=0):
         LOG.info("Adding task progress update: %s", task_id)
         task = db_api.get_task(ctxt, task_id)
         if task.status not in constants.ACTIVE_TASK_STATUSES:
@@ -3374,18 +3374,20 @@ class ConductorServerEndpoint(object):
                 "Refusing progress update. The progress update string "
                 "was: %s" % (
                     task.id, task.status, task.host, message))
-        db_api.add_task_progress_update(ctxt, task_id, total_steps, message)
+        return db_api.add_task_progress_update(
+            ctxt, task_id, message, initial_step=initial_step,
+            total_steps=total_steps)
 
     @task_synchronized
-    def update_task_progress_update(self, ctxt, task_id, step,
-                                    total_steps, message):
-        LOG.info("Updating task progress update: %s", task_id)
+    def update_task_progress_update(
+            self, ctxt, task_id, progress_update_index,
+            new_current_step, new_total_steps=None, new_message=None):
+        LOG.info(
+            "Updating progress update with index '%s' for task %s: %s",
+            progress_update_index, task_id, new_current_step)
         db_api.update_task_progress_update(
-            ctxt, task_id, step, total_steps, message)
-
-    @task_synchronized
-    def get_task_progress_step(self, ctxt, task_id):
-        return db_api.get_task_progress_step(ctxt, task_id)
+            ctxt, task_id, progress_update_index, new_current_step,
+            new_total_steps=new_total_steps, new_message=new_message)
 
     def _get_replica_schedule(self, ctxt, replica_id,
                               schedule_id, expired=True):

+ 112 - 70
coriolis/db/api.py

@@ -1,6 +1,8 @@
 # Copyright 2016 Cloudbase Solutions Srl
 # All Rights Reserved.
 
+import uuid
+
 from oslo_config import cfg
 from oslo_db import api as db_api
 from oslo_db import options as db_options
@@ -709,121 +711,161 @@ def get_task(context, task_id):
 @enginefacade.writer
 def add_task_event(context, task_id, level, message):
     task_event = models.TaskEvent()
+    task_event.id = str(uuid.uuid4())
+    task_event.index = 0
+    last_event = _get_last_task_event(context, task_id)
+    if last_event:
+        task_event.index = last_event.index + 1
     task_event.task_id = task_id
     task_event.level = level
     task_event.message = message
     _session(context).add(task_event)
+    return task_event
+
+
+@enginefacade.reader
+def _get_last_task_event(context, task_id):
+    q = _soft_delete_aware_query(
+        context, models.TaskEvent)
+    last_event = q.filter(
+        models.TaskEvent.task_id == task_id).order_by(
+            models.TaskEvent.index.desc()).first()
+    return last_event
+
+
+@enginefacade.reader
+def _get_last_task_progress_update(context, task_id):
+    q = _soft_delete_aware_query(
+        context, models.TaskProgressUpdate)
+    last_update = q.filter(
+        models.TaskProgressUpdate.task_id == task_id).order_by(
+            models.TaskProgressUpdate.index.desc()).first()
+    return last_update
+
+
+@enginefacade.reader
+def _get_last_minion_pool_event(context, pool_id):
+    q = _soft_delete_aware_query(
+        context, models.MinionPoolEvent)
+    last_event = q.filter(
+        models.MinionPoolEvent.pool_id == pool_id).order_by(
+            models.MinionPoolEvent.index.desc()).first()
+    return last_event
 
 
 @enginefacade.reader
-def get_last_minion_pool_event_index(context, pool_id):
-    last_index = _model_query(
-        context, func.max(models.MinionPoolEvent.index)).filter_by(
-            pool_id=pool_id).first()[0] or 0
-    return last_index
+def _get_last_minion_pool_progress_update(context, pool_id):
+    q = _soft_delete_aware_query(
+        context, models.MinionPoolProgressUpdate)
+    last_event = q.filter(
+        models.MinionPoolProgressUpdate.pool_id == pool_id).order_by(
+            models.MinionPoolProgressUpdate.index.desc()).first()
+    return last_event
 
 
 @enginefacade.writer
 def add_minion_pool_event(context, pool_id, level, message):
     pool_event = models.MinionPoolEvent()
+    pool_event.id = str(uuid.uuid4())
     pool_event.pool_id = pool_id
     pool_event.level = level
     pool_event.message = message
-    pool_event.index = (
-        get_last_minion_pool_event_index(context, pool_id) + 1)
+
+    pool_event.index = 0
+    last_pool_event = _get_last_minion_pool_event(context, pool_id)
+    if last_pool_event:
+        pool_event.index = last_pool_event.index + 1
+
     _session(context).add(pool_event)
+    return pool_event
 
 
-def _get_minion_pool_progress_update(context, pool_id, current_step):
+def _get_minion_pool_progress_update(context, pool_id, index):
     q = _soft_delete_aware_query(context, models.MinionPoolProgressUpdate)
     return q.filter(
         models.MinionPoolProgressUpdate.pool_id == pool_id,
-        models.MinionPoolProgressUpdate.current_step == current_step).first()
-
-
-@enginefacade.reader
-def get_minion_pool_progress_step(context, pool_id):
-    curr_step = 0
-    q = _soft_delete_aware_query(context, models.MinionPoolProgressUpdate)
-    last_step = q.filter(
-        models.MinionPoolProgressUpdate.pool_id == pool_id).order_by(
-            models.MinionPoolProgressUpdate.current_step.desc()).first()
-
-    if last_step:
-        curr_step = last_step.current_step
-
-    return curr_step
+        models.MinionPoolProgressUpdate.index == index).first()
 
 
 @enginefacade.writer
-def add_minion_pool_progress_update(context, pool_id, total_steps, message):
-    current_step = get_minion_pool_progress_step(context, pool_id) + 1
-    pool_progress_update = models.MinionPoolProgressUpdate(
-        pool_id=pool_id, current_step=current_step, total_steps=total_steps,
-        message=message)
+def add_minion_pool_progress_update(
+        context, pool_id, message, initial_step=0, total_steps=0):
+    pool_progress_update = models.MinionPoolProgressUpdate()
+    pool_progress_update.id = str(uuid.uuid4())
+    pool_progress_update.pool_id = pool_id
+    pool_progress_update.current_step = initial_step
+    pool_progress_update.total_steps = total_steps
+    pool_progress_update.message = message
+    pool_progress_update.index = 0
+    last_progress_update = _get_last_minion_pool_progress_update(
+        context, pool_id)
+    if last_progress_update:
+        pool_progress_update.index = last_progress_update.index + 1
+
     _session(context).add(pool_progress_update)
+    return pool_progress_update
 
 
 @enginefacade.writer
 def update_minion_pool_progress_update(
-        context, pool_id, step, total_steps, message):
+        context, pool_id, update_index, new_current_step,
+        new_total_steps=None, new_message=None):
     pool_progress_update = _get_minion_pool_progress_update(
-        context, pool_id, step)
+        context, pool_id, update_index)
     if not pool_progress_update:
-        pool_progress_update = models.MinionPoolProgressUpdate(
-            pool_id=pool_id, current_step=step, total_steps=total_steps,
-            message=message)
-        _session(context).add(pool_progress_update)
+        raise exception.NotFound(
+            "Could not find progress update for minion pool with ID '%s' and "
+            "index %s in the DB for updating." % (pool_id, update_index))
 
-    pool_progress_update.pool_id = pool_id
-    pool_progress_update.current_step = step
-    pool_progress_update.total_steps = total_steps
-    pool_progress_update.message = message
+    pool_progress_update.current_step = new_current_step
+    if new_total_steps is not None:
+        pool_progress_update.total_steps = new_total_steps
+    if new_message is not None:
+        pool_progress_update.message = new_message
+    return pool_progress_update
 
 
-def _get_progress_update(context, task_id, current_step):
+def _get_progress_update(context, task_id, index):
     q = _soft_delete_aware_query(context, models.TaskProgressUpdate)
     return q.filter(
         models.TaskProgressUpdate.task_id == task_id,
-        models.TaskProgressUpdate.current_step == current_step).first()
-
+        models.TaskProgressUpdate.index == index).first()
 
-@enginefacade.reader
-def get_task_progress_step(context, task_id):
-    curr_step = 0
-    q = _soft_delete_aware_query(context, models.TaskProgressUpdate)
-    last_step = q.filter(
-        models.TaskProgressUpdate.task_id == task_id).order_by(
-            models.TaskProgressUpdate.current_step.desc()).first()
 
-    if last_step:
-        curr_step = last_step.current_step
-
-    return curr_step
+@enginefacade.writer
+def add_task_progress_update(
+        context, task_id, message, initial_step=0, total_steps=0):
+    task_progress_update = models.TaskProgressUpdate()
+    task_progress_update.id = str(uuid.uuid4())
+    task_progress_update.task_id = task_id
+    task_progress_update.current_step = initial_step
+    task_progress_update.total_steps = total_steps
+    task_progress_update.message = message
 
+    task_progress_update.index = 0
+    last_progress_update = _get_last_task_progress_update(context, task_id)
+    if last_progress_update:
+        task_progress_update.index = last_progress_update.index + 1
 
-@enginefacade.writer
-def add_task_progress_update(context, task_id, total_steps, message):
-    current_step = get_task_progress_step(context, task_id) + 1
-    task_progress_update = models.TaskProgressUpdate(
-        task_id=task_id, current_step=current_step, total_steps=total_steps,
-        message=message)
     _session(context).add(task_progress_update)
+    return task_progress_update
 
 
 @enginefacade.writer
-def update_task_progress_update(context, task_id, step, total_steps, message):
-    task_progress_update = _get_progress_update(context, task_id, step)
+def update_task_progress_update(
+        context, task_id, update_index, new_current_step,
+        new_total_steps=None, new_message=None):
+    task_progress_update = _get_progress_update(context, task_id, update_index)
     if not task_progress_update:
-        task_progress_update = models.TaskProgressUpdate(
-            task_id=task_id, current_step=step, total_steps=total_steps,
-            message=message)
-        _session(context).add(task_progress_update)
+        raise exception.NotFound(
+            "Could not find progress update for task with ID '%s' and "
+            "index %s in the DB for updating." % (task_id, update_index))
 
-    task_progress_update.task_id = task_id
-    task_progress_update.current_step = step
-    task_progress_update.total_steps = total_steps
-    task_progress_update.message = message
+    task_progress_update.current_step = new_current_step
+    if new_total_steps is not None:
+        task_progress_update.total_steps = new_total_steps
+    if new_message is not None:
+        task_progress_update.message = new_message
 
 
 @enginefacade.writer
@@ -934,7 +976,7 @@ def add_endpoint_region_mapping(context, endpoint_region_mapping):
 def get_endpoint_region_mapping(context, endpoint_id, region_id):
     q = _soft_delete_aware_query(context, models.EndpointRegionMapping)
     q = q.filter(
-        models.EndpointRegionMapping.region == region_id)
+        models.EndpointRegionMapping.region_id == region_id)
     q = q.filter(
         models.EndpointRegionMapping.endpoint_id == endpoint_id)
     return q.all()
@@ -1140,7 +1182,7 @@ def add_service_region_mapping(context, service_region_mapping):
 def get_service_region_mapping(context, service_id, region_id):
     q = _soft_delete_aware_query(context, models.ServiceRegionMapping)
     q = q.filter(
-        models.ServiceRegionMapping.region == region_id)
+        models.ServiceRegionMapping.region_id == region_id)
     q = q.filter(
         models.ServiceRegionMapping.service_id == service_id)
     return q.all()

+ 4 - 2
coriolis/db/sqlalchemy/migrate_repo/versions/016_adds_minion_vm_pools.py

@@ -128,12 +128,14 @@ def upgrade(migrate_engine):
         sqlalchemy.Column('created_at', sqlalchemy.DateTime),
         sqlalchemy.Column('updated_at', sqlalchemy.DateTime),
         sqlalchemy.Column('deleted_at', sqlalchemy.DateTime),
+        sqlalchemy.Column('index', sqlalchemy.Integer, default=0),
         sqlalchemy.Column('deleted', sqlalchemy.String(36)),
         sqlalchemy.Column("pool_id", sqlalchemy.String(36),
                           sqlalchemy.ForeignKey('minion_pool.id'),
                           nullable=False),
-        sqlalchemy.Column("current_step", sqlalchemy.Integer, nullable=False),
-        sqlalchemy.Column("total_steps", sqlalchemy.Integer, nullable=True),
+        sqlalchemy.Column(
+            "current_step", sqlalchemy.BigInteger, nullable=False),
+        sqlalchemy.Column("total_steps", sqlalchemy.BigInteger, nullable=True),
         sqlalchemy.Column("message", sqlalchemy.Text, nullable=True),
         mysql_engine='InnoDB',
         mysql_charset='utf8'))

+ 20 - 0
coriolis/db/sqlalchemy/migrate_repo/versions/017_adds_task_progress_idices.py

@@ -0,0 +1,20 @@
+import sqlalchemy
+from sqlalchemy import types
+
+
+def upgrade(migrate_engine):
+    meta = sqlalchemy.MetaData()
+    meta.bind = migrate_engine
+
+    task_event = sqlalchemy.Table('task_event', meta, autoload=True)
+    event_index = sqlalchemy.Column(
+        "index", sqlalchemy.Integer, default=0, nullable=False)
+    task_event.create_column(event_index)
+
+    task_progress_update = sqlalchemy.Table(
+        'task_progress_update', meta, autoload=True)
+    progress_index = sqlalchemy.Column(
+        "index", sqlalchemy.Integer, default=0, nullable=False)
+    task_progress_update.create_column(progress_index)
+    task_progress_update.c.current_step.alter(type=sqlalchemy.BigInteger)
+    task_progress_update.c.total_steps.alter(type=sqlalchemy.BigInteger)

+ 20 - 11
coriolis/db/sqlalchemy/models.py

@@ -26,6 +26,7 @@ class TaskEvent(BASE, models.TimestampMixin, models.SoftDeleteMixin,
                                 sqlalchemy.ForeignKey('task.id'),
                                 nullable=False)
     level = sqlalchemy.Column(sqlalchemy.String(20), nullable=False)
+    index = sqlalchemy.Column(sqlalchemy.Integer, nullable=False)
     message = sqlalchemy.Column(sqlalchemy.String(1024), nullable=False)
 
     def to_dict(self):
@@ -34,6 +35,7 @@ class TaskEvent(BASE, models.TimestampMixin, models.SoftDeleteMixin,
             "task_id": self.task_id,
             "level": self.level,
             "message": self.message,
+            "index": self.index,
             "created_at": self.created_at,
             "updated_at": self.updated_at,
             "deleted_at": self.deleted_at,
@@ -75,7 +77,7 @@ class TaskProgressUpdate(BASE, models.TimestampMixin, models.SoftDeleteMixin,
                          models.ModelBase):
     __tablename__ = 'task_progress_update'
     __table_args__ = (
-        schema.UniqueConstraint("task_id", "current_step", "deleted"),)
+        schema.UniqueConstraint("task_id", "index", "deleted"),)
 
     id = sqlalchemy.Column(sqlalchemy.String(36),
                            default=lambda: str(uuid.uuid4()),
@@ -83,14 +85,17 @@ class TaskProgressUpdate(BASE, models.TimestampMixin, models.SoftDeleteMixin,
     task_id = sqlalchemy.Column(sqlalchemy.String(36),
                                 sqlalchemy.ForeignKey('task.id'),
                                 nullable=False)
-    current_step = sqlalchemy.Column(sqlalchemy.Integer, nullable=False)
-    total_steps = sqlalchemy.Column(sqlalchemy.Integer, nullable=True)
+
+    index = sqlalchemy.Column(sqlalchemy.Integer, nullable=False)
+    current_step = sqlalchemy.Column(sqlalchemy.BigInteger, nullable=False)
+    total_steps = sqlalchemy.Column(sqlalchemy.BigInteger, nullable=True)
     message = sqlalchemy.Column(sqlalchemy.Text, nullable=True)
 
     def to_dict(self):
         result = {
             "id": self.id,
             "task_id": self.task_id,
+            "index": self.index,
             "current_step": self.current_step,
             "total_steps": self.total_steps,
             "message": self.message,
@@ -106,7 +111,7 @@ class MinionPoolProgressUpdate(
         BASE, models.TimestampMixin, models.SoftDeleteMixin, models.ModelBase):
     __tablename__ = 'minion_pool_progress_update'
     __table_args__ = (
-        schema.UniqueConstraint("pool_id", "current_step", "deleted"),)
+        schema.UniqueConstraint("pool_id", "index", "deleted"),)
 
     id = sqlalchemy.Column(sqlalchemy.String(36),
                            default=lambda: str(uuid.uuid4()),
@@ -114,14 +119,16 @@ class MinionPoolProgressUpdate(
     pool_id = sqlalchemy.Column(sqlalchemy.String(36),
                                 sqlalchemy.ForeignKey('minion_pool.id'),
                                 nullable=False)
-    current_step = sqlalchemy.Column(sqlalchemy.Integer, nullable=False)
-    total_steps = sqlalchemy.Column(sqlalchemy.Integer, nullable=True)
+    index = sqlalchemy.Column(sqlalchemy.Integer, nullable=False)
+    current_step = sqlalchemy.Column(sqlalchemy.BigInteger, nullable=False)
+    total_steps = sqlalchemy.Column(sqlalchemy.BigInteger, nullable=True)
     message = sqlalchemy.Column(sqlalchemy.String(1024), nullable=True)
 
     def to_dict(self):
         result = {
             "id": self.id,
             "pool_id": self.pool_id,
+            "index": self.index,
             "current_step": self.current_step,
             "total_steps": self.total_steps,
             "message": self.message,
@@ -150,17 +157,18 @@ class Task(BASE, models.TimestampMixin, models.SoftDeleteMixin,
     task_type = sqlalchemy.Column(sqlalchemy.String(100), nullable=False)
     exception_details = sqlalchemy.Column(sqlalchemy.Text, nullable=True)
     depends_on = sqlalchemy.Column(types.List, nullable=True)
-    index = sqlalchemy.Column(sqlalchemy.Integer, nullable=True)
+    index = sqlalchemy.Column(sqlalchemy.Integer, nullable=False)
     on_error = sqlalchemy.Column(sqlalchemy.Boolean, nullable=False)
     # TODO(alexpilotti): Add soft delete filter
     events = orm.relationship(TaskEvent, cascade="all,delete",
-                              backref=orm.backref('task'))
+                              backref=orm.backref('task'),
+                              order_by=TaskEvent.index)
     # TODO(alexpilotti): Add soft delete filter
     progress_updates = orm.relationship(TaskProgressUpdate,
                                         cascade="all,delete",
                                         backref=orm.backref('task'),
                                         order_by=(
-                                            TaskProgressUpdate.current_step))
+                                            TaskProgressUpdate.index))
 
     def to_dict(self):
         result = {
@@ -572,12 +580,13 @@ class MinionPool(
         primaryjoin="and_(MinionMachine.pool_id==MinionPool.id, "
                     "MinionMachine.deleted=='0')")
     events = orm.relationship(MinionPoolEvent, cascade="all,delete",
-                              backref=orm.backref('minion_pool'))
+                              backref=orm.backref('minion_pool'),
+                              order_by=MinionPoolEvent.index)
     progress_updates = orm.relationship(MinionPoolProgressUpdate,
                                         cascade="all,delete",
                                         backref=orm.backref('minion_pool'),
                                         order_by=(
-                                            MinionPoolProgressUpdate.current_step))
+                                            MinionPoolProgressUpdate.index))
 
     def to_dict(
             self, include_machines=True, include_events=True,

+ 53 - 55
coriolis/events.py

@@ -7,95 +7,93 @@ import collections
 from oslo_log import log as logging
 from six import with_metaclass
 
+from coriolis import constants
+
 
 LOG = logging.getLogger(__name__)
 
 _PercStepData = collections.namedtuple(
-    "_PercStepData", "last_value max_value perc_threshold message_format")
+    "_PercStepData", "progress_update_id last_value total_steps")
 
 
 class EventManager(object, with_metaclass(abc.ABCMeta)):
 
     def __init__(self, event_handler):
         self._event_handler = event_handler
-        self._total_steps = None
-        self._percentage_steps = {}
-
-    def set_total_progress_steps(self, total_steps):
-        self._total_steps = total_steps
 
-    def add_percentage_step(self, max_value, perc_threshold=1,
-                            message_format="{:.0f}%"):
-        if max_value < 0:
+    def _call_event_handler(self, method_name, *args, **kwargs):
+        if self._event_handler:
+            method_obj = getattr(self._event_handler, str(method_name), None)
+            if not method_obj:
+                raise AttributeError(
+                    "No method named '%s' for event handler of type '%s'." % (
+                        method_name, type(self._event_handler)))
+            return method_obj(*args, **kwargs)
+
+    def add_percentage_step(self, message, total_steps, initial_step=0):
+        if total_steps < 0:
             LOG.warn(
                 "Max percentage value was negative (%s). Reset to 0",
-                max_value)
-            max_value = 0
-        if max_value == 0:
+                total_steps)
+            total_steps = 0
+        if total_steps == 0:
             LOG.warn("Max percentage value set to 0 (zero)")
-        current_step = self._event_handler.get_task_progress_step() + 1
-        self._percentage_steps[current_step] = _PercStepData(
-            0, max_value, perc_threshold, message_format)
-        return current_step
-
-    def set_percentage_step(self, step, value):
-        step_data = self._percentage_steps[step]
-
-        old_perc = 100
-        perc = 100
-        if step_data.max_value != 0:
-            old_perc = (step_data.last_value * 100 / step_data.max_value //
-                        step_data.perc_threshold * step_data.perc_threshold)
-            perc = (value * 100 / step_data.max_value //
-                    step_data.perc_threshold * step_data.perc_threshold)
-
-        if perc > old_perc and self._event_handler:
-            self._event_handler.update_task_progress_update(
-                step, self._total_steps, step_data.message_format.format(perc))
-            self._percentage_steps[step] = _PercStepData(
-                value, step_data.max_value, step_data.perc_threshold,
-                step_data.message_format)
+
+        if initial_step > total_steps:
+            raise ValueError(
+                "Provided percent step initial value '%s' is larger than the "
+                "maximum value '%s'" % (initial_step, total_steps))
+        progress_update = self._call_event_handler(
+            'add_progress_update', message, initial_step=initial_step,
+            total_steps=total_steps, return_event=True)
+        progress_update_id = (
+            self._call_event_handler(
+                'get_progress_update_identifier', progress_update))
+
+        return _PercStepData(progress_update_id, initial_step, total_steps)
+
+    def set_percentage_step(self, step, new_current_step):
+        self._call_event_handler(
+            'update_progress_update', step.progress_update_id,
+            new_current_step)
 
     def progress_update(self, message):
-        if self._event_handler:
-            self._event_handler.add_task_progress_update(
-                self._total_steps, message)
+        self._call_event_handler(
+            'add_progress_update', message, return_event=False)
 
     def info(self, message):
-        if self._event_handler:
-            self._event_handler.info(message)
+        self._call_event_handler(
+            'add_event', message, level=constants.TASK_EVENT_INFO)
 
     def warn(self, message):
-        if self._event_handler:
-            self._event_handler.warn(message)
+        self._call_event_handler(
+            'add_event', message, level=constants.TASK_EVENT_WARNING)
 
     def error(self, message):
-        if self._event_handler:
-            self._event_handler.error(message)
+        self._call_event_handler(
+            'add_event', message, level=constants.TASK_EVENT_ERROR)
 
 
 class BaseEventHandler(object, with_metaclass(abc.ABCMeta)):
 
     @abc.abstractmethod
-    def add_task_progress_update(self, total_steps, message):
+    def add_progress_update(
+            self, message, initial_step=0, total_steps=0,
+            return_event=False):
         pass
 
     @abc.abstractmethod
-    def update_task_progress_update(self, step, total_steps, message):
+    def update_progress_update(
+            self, update_identifier, new_current_step,
+            new_total_steps=None, new_message=None):
         pass
 
+    @classmethod
     @abc.abstractmethod
-    def get_task_progress_step(self):
+    def get_progress_update_identifier(cls, progress_update):
+        """ Returns the identifier for a given progress update. """
         pass
 
     @abc.abstractmethod
-    def info(self, message):
-        pass
-
-    @abc.abstractmethod
-    def warn(self, message):
-        pass
-
-    @abc.abstractmethod
-    def error(self, message):
+    def add_event(self, message, level=constants.TASK_EVENT_INFO):
         pass

+ 1 - 3
coriolis/migrations/manager.py

@@ -27,9 +27,7 @@ def _copy_volume(volume, disk_image_reader, backup_writer, event_manager):
             disk_size = reader.disk_size
 
             perc_step = event_manager.add_percentage_step(
-                disk_size,
-                message_format="Disk copy progress for %s: "
-                               "{:.0f}%%" % disk_id)
+                "Copying data of disk %s" % disk_id, disk_size)
 
             offset = 0
             max_block_size = 10 * units.Mi  # 10 MB

+ 61 - 39
coriolis/minion_manager/rpc/client.py

@@ -2,20 +2,25 @@
 # All Rights Reserved.
 
 from oslo_config import cfg
+from oslo_log import log as logging
 import oslo_messaging as messaging
 
+from coriolis import constants
+from coriolis import events
 from coriolis import rpc
 
+
 VERSION = "1.0"
+LOG = logging.getLogger(__name__)
 
-scheduler_opts = [
+MINION_MANAGER_OPTS = [
     cfg.IntOpt("minion_mananger_rpc_timeout",
                help="Number of seconds until RPC calls to the "
                     "minion manager timeout.")
 ]
 
 CONF = cfg.CONF
-CONF.register_opts(scheduler_opts, 'minion_manager')
+CONF.register_opts(MINION_MANAGER_OPTS, 'minion_manager')
 
 
 class MinionManagerClient(rpc.BaseRPCClient):
@@ -28,23 +33,25 @@ class MinionManagerClient(rpc.BaseRPCClient):
             target, timeout=timeout)
 
     def add_minion_pool_progress_update(
-            self, ctxt, minion_pool_id, total_steps, message):
-        return self._cast(
+            self, ctxt, minion_pool_id, message, initial_step=0, total_steps=0,
+            return_event=False):
+        operation = self._cast
+        if return_event:
+            operation = self._call
+        return operation(
             ctxt, 'add_minion_pool_progress_update',
-            minion_pool_id=minion_pool_id,
-            total_steps=total_steps, message=message)
+            minion_pool_id=minion_pool_id, message=message,
+            initial_step=initial_step, total_steps=total_steps)
 
     def update_minion_pool_progress_update(
-            self, ctxt, minion_pool_id, step, total_steps, message):
-        return self._cast(
+            self, ctxt, minion_pool_id, progress_update_index, new_current_step,
+            new_total_steps=None, new_message=None):
+        self._cast(
             ctxt, 'update_minion_pool_progress_update',
             minion_pool_id=minion_pool_id,
-            step=step, total_steps=total_steps, message=message)
-
-    def get_minion_pool_progress_step(self, ctxt, minion_pool_id):
-        return self._cast(
-            ctxt, 'get_minion_pool_progress_step',
-            minion_pool_id=minion_pool_id)
+            progress_update_index=progress_update_index,
+            new_current_step=new_current_step, new_total_steps=new_total_steps,
+            new_message=new_message)
 
     def add_minion_pool_event(self, ctxt, minion_pool_id, level, message):
         return self._cast(
@@ -141,31 +148,6 @@ class MinionManagerClient(rpc.BaseRPCClient):
         return self._call(
             ctxt, 'delete_minion_pool', minion_pool_id=minion_pool_id)
 
-    def get_minion_pool_lifecycle_executions(
-            self, ctxt, minion_pool_id, include_tasks=False):
-        return self._call(
-            ctxt, 'get_minion_pool_lifecycle_executions',
-            minion_pool_id=minion_pool_id, include_tasks=include_tasks)
-
-    def get_minion_pool_lifecycle_execution(
-            self, ctxt, minion_pool_id, execution_id):
-        return self._call(
-            ctxt, 'get_minion_pool_lifecycle_execution',
-            minion_pool_id=minion_pool_id, execution_id=execution_id)
-
-    def delete_minion_pool_lifecycle_execution(
-            self, ctxt, minion_pool_id, execution_id):
-        return self._call(
-            ctxt, 'delete_minion_pool_lifecycle_execution',
-            minion_pool_id=minion_pool_id, execution_id=execution_id)
-
-    def cancel_minion_pool_lifecycle_execution(
-            self, ctxt, minion_pool_id, execution_id, force):
-        return self._call(
-            ctxt, 'cancel_minion_pool_lifecycle_execution',
-            minion_pool_id=minion_pool_id, execution_id=execution_id,
-            force=force)
-
     def get_endpoint_source_minion_pool_options(
             self, ctxt, endpoint_id, env, option_names):
         return self._call(
@@ -189,3 +171,43 @@ class MinionManagerClient(rpc.BaseRPCClient):
         return self._call(
             ctxt, 'validate_endpoint_destination_minion_pool_options',
             endpoint_id=endpoint_id, pool_environment=pool_environment)
+
+
+class MinionManagerPoolRpcEventHandler(events.BaseEventHandler):
+    def __init__(self, ctxt, pool_id):
+        self._ctxt = ctxt
+        self._pool_id = pool_id
+
+    @property
+    def _rpc_minion_manager_client(self):
+        # NOTE(aznashwan): it is unsafe to fork processes with pre-instantiated
+        # oslo_messaging clients as the underlying eventlet thread queues will
+        # be invalidated.
+        return MinionManagerClient()
+
+    @classmethod
+    def get_progress_update_identifier(self, progress_update):
+        return progress_update['index']
+
+    def add_progress_update(
+            self, message, initial_step=0, total_steps=0, return_event=False):
+        LOG.info(
+            "Sending progress update for pool '%s' to minion manager : %s",
+            self._pool_id, message)
+        return self._rpc_minion_manager_client.add_minion_pool_progress_update(
+            self._ctxt, self._pool_id, message, initial_step=initial_step,
+            total_steps=total_steps, return_event=return_event)
+
+    def update_progress_update(
+            self, update_identifier, new_current_step,
+            new_total_steps=None, new_message=None):
+        LOG.info(
+            "Updating progress update '%s' for pool '%s' with new step %s",
+            update_identifier, self._pool_id, new_current_step)
+        self._rpc_minion_manager_client.update_minion_pool_progress_update(
+            self._ctxt, self._pool_id, update_identifier, new_current_step,
+            new_total_steps=new_total_steps, new_message=new_message)
+
+    def add_event(self, message, level=constants.TASK_EVENT_INFO):
+        self._rpc_minion_manager_client.add_minion_pool_event(
+            self._ctxt, self._pool_id, level, message)

+ 13 - 11
coriolis/minion_manager/rpc/server.py

@@ -294,28 +294,30 @@ class MinionManagerServerEndpoint(object):
         self._add_minion_pool_event(ctxt, minion_pool_id, level, message)
 
     def _add_minion_pool_progress_update(
-            self, ctxt, minion_pool_id, total_steps, message):
+            self, ctxt, minion_pool_id, message, initial_step=0, total_steps=0):
         LOG.info(
             "Adding pool progress update for %s: %s", minion_pool_id, message)
         db_api.add_minion_pool_progress_update(
-            ctxt, minion_pool_id, total_steps, message)
+            ctxt, minion_pool_id, message, initial_step=initial_step,
+            total_steps=total_steps)
 
     @minion_manager_utils.minion_pool_synchronized_op
     def add_minion_pool_progress_update(
-            self, ctxt, minion_pool_id, total_steps, message):
+            self, ctxt, minion_pool_id, message, initial_step=0, total_steps=0):
         self._add_minion_pool_progress_update(
-            ctxt, minion_pool_id, total_steps, message)
+            ctxt, minion_pool_id, message, initial_step=initial_step,
+            total_steps=total_steps)
 
     @minion_manager_utils.minion_pool_synchronized_op
     def update_minion_pool_progress_update(
-            self, ctxt, minion_pool_id, step, total_steps, message):
-        LOG.info("Updating minion pool progress update: %s", minion_pool_id)
+            self, ctxt, minion_pool_id, progress_update_index,
+            new_current_step, new_total_steps=None, new_message=None):
+        LOG.info(
+            "Updating minion pool '%s' progress update '%s': %s",
+            minion_pool_id, progress_update_index, new_current_step)
         db_api.update_minion_pool_progress_update(
-            ctxt, minion_pool_id, step, total_steps, message)
-
-    @minion_manager_utils.minion_pool_synchronized_op
-    def get_minion_pool_progress_step(self, ctxt, minion_pool_id):
-        return db_api.get_minion_pool_progress_step(ctxt, minion_pool_id)
+            ctxt, minion_pool_id, progress_update_index, new_current_step,
+            new_total_steps=new_total_steps, new_message=new_message)
 
     def _check_keys_for_action_dict(
             self, action, required_action_properties, operation=None):

+ 8 - 11
coriolis/providers/replicator.py

@@ -434,10 +434,8 @@ class Replicator(object):
                 perc_step = perc_steps.get(devName)
                 if perc_step is None:
                     perc_step = self._event_manager.add_percentage_step(
-                        100,
-                        message_format=(
-                            "Chunking progress for disk %s (%.2f MB): "
-                            "{:.0f}%%") % (devName, dev_size))
+                        "Performing chunking for disk %s (total size %.2f MB)" % (
+                            devName, dev_size), 100)
                     perc_steps[devName] = perc_step
                 perc_done = vol["checksum-status"]["percentage"]
                 self._event_manager.set_percentage_step(
@@ -801,11 +799,11 @@ class Replicator(object):
             size = self._get_size_from_chunks(chunks)
 
             msg = (
-                "Disk replication progress for disk \"%s\" (device \"%s\" "
-                "written chunks: %.2f MB): {:.0f}%%") % (
+                "Replicating changed data for disk \"%s\" (device \"%s\", "
+                "written chunks: %.2f MB)") % (
                     volume["disk_id"], devName, size)
             perc_step = self._event_manager.add_percentage_step(
-                len(chunks), message_format=msg)
+                msg, len(chunks))
 
             total = 0
             with backup_writer.open("", volume['disk_id']) as destination:
@@ -829,8 +827,7 @@ class Replicator(object):
         size = self._cli.get_disk_size(disk)
 
         perc_step = self._event_manager.add_percentage_step(
-            size, message_format="Downloading disk /dev/%s : "
-            "{:.0f}%%" % disk)
+            "Downloading full disk /dev/%s" % disk, size)
 
         total = 0
         with self._cli._cli.get(diskUri, stream=True,
@@ -853,8 +850,8 @@ class Replicator(object):
             # create sparse file
             fp.truncate(size)
             perc_step = self._event_manager.add_percentage_step(
-                len(chunks), message_format="Disk download progress for "
-                "/dev/%s (%s MB): {:.0f}%%" % (disk, size_from_chunks))
+                "Downloading spart disk /dev/%s (%s MB)" % (
+                    disk, size_from_chunks), len(chunks))
             for chunk in chunks:
                 offset = int(chunk["offset"])
                 # seek to offset

+ 3 - 77
coriolis/worker/rpc/server.py

@@ -37,88 +37,14 @@ LOG = logging.getLogger(__name__)
 VERSION = "1.0"
 
 
-class _ConductorProviderEventHandler(events.BaseEventHandler):
-    def __init__(self, ctxt, task_id):
-        self._ctxt = ctxt
-        self._task_id = task_id
-        self._rpc_conductor_client = rpc_conductor_client.ConductorClient()
-
-    def add_task_progress_update(self, total_steps, message):
-        LOG.info("Progress update: %s", message)
-        self._rpc_conductor_client.add_task_progress_update(
-            self._ctxt, self._task_id, total_steps, message)
-
-    def update_task_progress_update(self, step, total_steps, message):
-        LOG.info("Progress update: %s", message)
-        self._rpc_conductor_client.update_task_progress_update(
-            self._ctxt, self._task_id, step, total_steps, message)
-
-    def get_task_progress_step(self):
-        return self._rpc_conductor_client.get_task_progress_step(
-            self._ctxt, self._task_id)
-
-    def info(self, message):
-        LOG.info(message)
-        self._rpc_conductor_client.task_event(
-            self._ctxt, self._task_id, constants.TASK_EVENT_INFO, message)
-
-    def warn(self, message):
-        LOG.warn(message)
-        self._rpc_conductor_client.task_event(
-            self._ctxt, self._task_id, constants.TASK_EVENT_WARNING, message)
-
-    def error(self, message):
-        LOG.error(message)
-        self._rpc_conductor_client.task_event(
-            self._ctxt, self._task_id, constants.TASK_EVENT_ERROR, message)
-
-
-class _MinionPoolManagerProviderEventHandler(events.BaseEventHandler):
-    def __init__(self, ctxt, pool_id):
-        self._ctxt = ctxt
-        self._pool_id = pool_id
-        self._rpc_minion_manager_client = (
-            rpc_minion_manager_client.MinionManagerClient())
-
-    def add_task_progress_update(self, total_steps, message):
-        LOG.info(
-            "Minion pool '%s' progress update: %s", self._pool_id, message)
-        self._rpc_minion_manager_client.add_minion_pool_progress_update(
-            self._ctxt, self._pool_id, total_steps, message)
-
-    def update_task_progress_update(self, step, total_steps, message):
-        LOG.info(
-            "Minion pool '%s' progress update: %s", self._pool_id, message)
-        self._rpc_minion_manager_client.update_minion_pool_progress_update(
-            self._ctxt, self._pool_id, step, total_steps, message)
-
-    def get_task_progress_step(self):
-        return self._rpc_minion_manager_client.get_minion_pool_progress_step(
-            self._ctxt, self._pool_id)
-
-    def info(self, message):
-        LOG.info(message)
-        self._rpc_minion_manager_client.add_minion_pool_event(
-            self._ctxt, self._pool_id, constants.MINION_POOL_EVENT_INFO, message)
-
-    def warn(self, message):
-        LOG.warn(message)
-        self._rpc_minion_manager_client.add_minion_pool_event(
-            self._ctxt, self._pool_id, constants.MINION_POOL_EVENT_WARNING, message)
-
-    def error(self, message):
-        LOG.error(message)
-        self._rpc_minion_manager_client.add_minion_pool_event(
-            self._ctxt, self._pool_id, constants.MINION_POOL_EVENT_ERROR, message)
-
-
 # TODO(aznashwan): parametrize the event handler provided during task execution
 # to decouple what gets notified from the task running logic itself:
 def _get_event_handler_for_task_type(task_type, ctxt, task_object_id):
     if task_type in constants.MINION_POOL_OPERATIONS_TASKS:
-        return _MinionPoolManagerProviderEventHandler(
+        return rpc_minion_manager_client.MinionManagerPoolRpcEventHandler(
             ctxt, task_object_id)
-    return _ConductorProviderEventHandler(ctxt, task_object_id)
+    return rpc_conductor_client.ConductorTaskRpcEventHandler(
+        ctxt, task_object_id)
 
 
 class WorkerServerEndpoint(object):