Просмотр исходного кода

Fix clobbering of Task progress updates

- Mainly fixed by sorting progress updates by current_step
- current_step incrementation now occurs in db/api.py
- Added separate methods for adding and updating progress updates
Daniel Vincze 5 лет назад
Родитель
Сommit
af51fdb575

+ 12 - 5
coriolis/conductor/rpc/client.py

@@ -277,11 +277,18 @@ class ConductorClient(object):
         self._client.cast(ctxt, 'task_event', task_id=task_id, level=level,
                           message=message)
 
-    def task_progress_update(self, ctxt, task_id, current_step, total_steps,
-                             message):
-        self._client.cast(ctxt, 'task_progress_update', task_id=task_id,
-                          current_step=current_step, total_steps=total_steps,
-                          message=message)
+    def add_task_progress_update(self, ctxt, task_id, total_steps, message):
+        self._client.cast(ctxt, 'add_task_progress_update', task_id=task_id,
+                          total_steps=total_steps, message=message)
+
+    def update_task_progress_update(self, ctxt, task_id, step,
+                                    total_steps, message):
+        self._client.cast(ctxt, 'update_task_progress_update', task_id=task_id,
+                          step=step, total_steps=total_steps, message=message)
+
+    def get_task_progress_step(self, ctxt, task_id):
+        return self._client.call(ctxt, 'get_task_progress_step',
+                                 task_id=task_id)
 
     def create_replica_schedule(self, ctxt, replica_id,
                                 schedule, enabled, exp_date,

+ 14 - 5
coriolis/conductor/rpc/server.py

@@ -3715,9 +3715,8 @@ class ConductorServerEndpoint(object):
         db_api.add_task_event(ctxt, task_id, level, message)
 
     @task_synchronized
-    def task_progress_update(self, ctxt, task_id, current_step, total_steps,
-                             message):
-        LOG.info("Task progress update: %s", task_id)
+    def add_task_progress_update(self, ctxt, task_id, total_steps, message):
+        LOG.info("Adding task progress update: %s", task_id)
         task = db_api.get_task(ctxt, task_id)
         if task.status not in constants.ACTIVE_TASK_STATUSES:
             raise exception.InvalidTaskState(
@@ -3726,8 +3725,18 @@ class ConductorServerEndpoint(object):
                 "Refusing progress update. The progress update string "
                 "was: %s" % (
                     task.id, task.status, task.host, message))
-        db_api.add_task_progress_update(ctxt, task_id, current_step,
-                                        total_steps, message)
+        db_api.add_task_progress_update(ctxt, task_id, total_steps, message)
+
+    @task_synchronized
+    def update_task_progress_update(self, ctxt, task_id, step,
+                                    total_steps, message):
+        LOG.info("Updating task progress update: %s", task_id)
+        db_api.update_task_progress_update(
+            ctxt, task_id, step, total_steps, message)
+
+    @task_synchronized
+    def get_task_progress_step(self, ctxt, task_id):
+        return db_api.get_task_progress_step(ctxt, task_id)
 
     def _get_replica_schedule(self, ctxt, replica_id,
                               schedule_id, expired=True):

+ 29 - 5
coriolis/db/api.py

@@ -722,16 +722,40 @@ def _get_progress_update(context, task_id, current_step):
         models.TaskProgressUpdate.current_step == current_step).first()
 
 
+@enginefacade.reader
+def get_task_progress_step(context, task_id):
+    curr_step = 0
+    q = _soft_delete_aware_query(context, models.TaskProgressUpdate)
+    last_step = q.filter(
+        models.TaskProgressUpdate.task_id == task_id).order_by(
+            models.TaskProgressUpdate.current_step.desc()).first()
+
+    if last_step:
+        curr_step = last_step.current_step
+
+    return curr_step
+
+
+@enginefacade.writer
+def add_task_progress_update(context, task_id, total_steps, message):
+    current_step = get_task_progress_step(context, task_id) + 1
+    task_progress_update = models.TaskProgressUpdate(
+        task_id=task_id, current_step=current_step, total_steps=total_steps,
+        message=message)
+    _session(context).add(task_progress_update)
+
+
 @enginefacade.writer
-def add_task_progress_update(context, task_id, current_step, total_steps,
-                             message):
-    task_progress_update = _get_progress_update(context, task_id, current_step)
+def update_task_progress_update(context, task_id, step, total_steps, message):
+    task_progress_update = _get_progress_update(context, task_id, step)
     if not task_progress_update:
-        task_progress_update = models.TaskProgressUpdate()
+        task_progress_update = models.TaskProgressUpdate(
+            task_id=task_id, current_step=step, total_steps=total_steps,
+            message=message)
         _session(context).add(task_progress_update)
 
     task_progress_update.task_id = task_id
-    task_progress_update.current_step = current_step
+    task_progress_update.current_step = step
     task_progress_update.total_steps = total_steps
     task_progress_update.message = message
 

+ 6 - 1
coriolis/db/sqlalchemy/models.py

@@ -45,6 +45,8 @@ class TaskEvent(BASE, models.TimestampMixin, models.SoftDeleteMixin,
 class TaskProgressUpdate(BASE, models.TimestampMixin, models.SoftDeleteMixin,
                          models.ModelBase):
     __tablename__ = 'task_progress_update'
+    __table_args__ = (
+        schema.UniqueConstraint("task_id", "current_step", "deleted"),)
 
     id = sqlalchemy.Column(sqlalchemy.String(36),
                            default=lambda: str(uuid.uuid4()),
@@ -96,7 +98,9 @@ class Task(BASE, models.TimestampMixin, models.SoftDeleteMixin,
     # TODO(alexpilotti): Add soft delete filter
     progress_updates = orm.relationship(TaskProgressUpdate,
                                         cascade="all,delete",
-                                        backref=orm.backref('task'))
+                                        backref=orm.backref('task'),
+                                        order_by=(
+                                            TaskProgressUpdate.current_step))
 
     def to_dict(self):
         result = {
@@ -295,6 +299,7 @@ class Migration(BaseTransferAction):
         })
         return base
 
+
 class ServiceRegionMapping(
         BASE, models.TimestampMixin, models.ModelBase, models.SoftDeleteMixin):
     __tablename__ = "service_region_mapping"

+ 11 - 9
coriolis/events.py

@@ -18,7 +18,6 @@ class EventManager(object, with_metaclass(abc.ABCMeta)):
 
     def __init__(self, event_handler):
         self._event_handler = event_handler
-        self._current_step = 0
         self._total_steps = None
         self._percentage_steps = {}
 
@@ -34,10 +33,10 @@ class EventManager(object, with_metaclass(abc.ABCMeta)):
             max_value = 0
         if max_value == 0:
             LOG.warn("Max percentage value set to 0 (zero)")
-        self._current_step += 1
-        self._percentage_steps[self._current_step] = _PercStepData(
+        current_step = self._event_handler.get_task_progress_step() + 1
+        self._percentage_steps[current_step] = _PercStepData(
             0, max_value, perc_threshold, message_format)
-        return self._current_step
+        return current_step
 
     def set_percentage_step(self, step, value):
         step_data = self._percentage_steps[step]
@@ -51,17 +50,16 @@ class EventManager(object, with_metaclass(abc.ABCMeta)):
                     step_data.perc_threshold * step_data.perc_threshold)
 
         if perc > old_perc and self._event_handler:
-            self._event_handler.progress_update(
+            self._event_handler.update_task_progress_update(
                 step, self._total_steps, step_data.message_format.format(perc))
             self._percentage_steps[step] = _PercStepData(
                 value, step_data.max_value, step_data.perc_threshold,
                 step_data.message_format)
 
     def progress_update(self, message):
-        self._current_step += 1
         if self._event_handler:
-            self._event_handler.progress_update(
-                self._current_step, self._total_steps, message)
+            self._event_handler.add_task_progress_update(
+                self._total_steps, message)
 
     def info(self, message):
         if self._event_handler:
@@ -79,7 +77,11 @@ class EventManager(object, with_metaclass(abc.ABCMeta)):
 class BaseEventHandler(object, with_metaclass(abc.ABCMeta)):
 
     @abc.abstractmethod
-    def progress_update(self, current_step, total_steps, message):
+    def add_task_progress_update(self, total_steps, message):
+        pass
+
+    @abc.abstractmethod
+    def update_task_progress_update(self, step, total_steps, message):
         pass
 
     @abc.abstractmethod

+ 12 - 3
coriolis/worker/rpc/server.py

@@ -41,10 +41,19 @@ class _ConductorProviderEventHandler(events.BaseEventHandler):
         self._task_id = task_id
         self._rpc_conductor_client = rpc_conductor_client.ConductorClient()
 
-    def progress_update(self, current_step, total_steps, message):
+    def add_task_progress_update(self, total_steps, message):
         LOG.info("Progress update: %s", message)
-        self._rpc_conductor_client.task_progress_update(
-            self._ctxt, self._task_id, current_step, total_steps, message)
+        self._rpc_conductor_client.add_task_progress_update(
+            self._ctxt, self._task_id, total_steps, message)
+
+    def update_task_progress_update(self, step, total_steps, message):
+        LOG.info("Progress update: %s", message)
+        self._rpc_conductor_client.update_task_progress_update(
+            self._ctxt, self._task_id, step, total_steps, message)
+
+    def get_task_progress_step(self):
+        return self._rpc_conductor_client.get_task_progress_step(
+            self._ctxt, self._task_id)
 
     def info(self, message):
         LOG.info(message)