Просмотр исходного кода

Prevent cross-instance task state updates.

Nashwan Azhari 6 лет назад
Родитель
Сommit
692f620041
1 измененных файлов с 33 добавлено и 16 удалено
  1. 33 16
      coriolis/conductor/rpc/server.py

+ 33 - 16
coriolis/conductor/rpc/server.py

@@ -1342,7 +1342,7 @@ class ConductorServerEndpoint(object):
         if ctxt.delete_trust_id:
         if ctxt.delete_trust_id:
             keystone.delete_trust(ctxt)
             keystone.delete_trust(ctxt)
 
 
-    @task_synchronized
+    @task_and_execution_synchronized
     def set_task_host(self, ctxt, task_id, host, process_id):
     def set_task_host(self, ctxt, task_id, host, process_id):
         """ Saves the ID of the worker host which has accepted and started
         """ Saves the ID of the worker host which has accepted and started
         the task to the DB and marks the task as 'RUNNING'. """
         the task to the DB and marks the task as 'RUNNING'. """
@@ -1480,7 +1480,8 @@ class ConductorServerEndpoint(object):
             execution.id, status, task_stat_map)
             execution.id, status, task_stat_map)
         return status
         return status
 
 
-    def _advance_execution_state(self, ctxt, execution, requery=True):
+    def _advance_execution_state(
+            self, ctxt, execution, requery=True, instance=None):
         """ Advances the state of the execution by starting/refreshing
         """ Advances the state of the execution by starting/refreshing
         the state of all child tasks.
         the state of all child tasks.
         If the execution has finalized (either completed or error'd),
         If the execution has finalized (either completed or error'd),
@@ -1507,7 +1508,8 @@ class ConductorServerEndpoint(object):
             LOG.warn(
             LOG.warn(
                 "Execution state advancement called on Execution '%s' which "
                 "Execution state advancement called on Execution '%s' which "
                 "is not in an active status in the DB (it's currently '%s'). "
                 "is not in an active status in the DB (it's currently '%s'). "
-                "Double-checking for deadlock and returning early.")
+                "Double-checking for deadlock and returning early.",
+                execution.id, execution.status)
             if self._check_clean_execution_deadlock(
             if self._check_clean_execution_deadlock(
                     ctxt, execution, task_statuses=None,
                     ctxt, execution, task_statuses=None,
                     requery=not requery) == (
                     requery=not requery) == (
@@ -1518,16 +1520,27 @@ class ConductorServerEndpoint(object):
                     execution.id)
                     execution.id)
             return []
             return []
 
 
+        tasks_to_process = execution.tasks
+        if instance:
+            tasks_to_process = [
+                task for task in execution.tasks
+                if task.instance == instance]
+        if not tasks_to_process:
+            raise exception.InvalidActionTasksExecutionState(
+                "State advancement requested for execution '%s' for "
+                "instance '%s', which has no tasks defined for it." % (
+                    execution.id, instance))
+
         LOG.debug(
         LOG.debug(
             "State of execution '%s' before state advancement is: %s",
             "State of execution '%s' before state advancement is: %s",
             execution.id, execution.status)
             execution.id, execution.status)
 
 
-        started_tasks = []
-
         origin = self._get_task_origin(ctxt, execution.action)
         origin = self._get_task_origin(ctxt, execution.action)
         destination = self._get_task_destination(ctxt, execution.action)
         destination = self._get_task_destination(ctxt, execution.action)
         action = db_api.get_action(ctxt, execution.action_id)
         action = db_api.get_action(ctxt, execution.action_id)
 
 
+        started_tasks = []
+
         def _start_task(task):
         def _start_task(task):
             task_info = None
             task_info = None
             if task.instance not in action.info:
             if task.instance not in action.info:
@@ -1552,7 +1565,7 @@ class ConductorServerEndpoint(object):
                 task_info=task_info)
                 task_info=task_info)
             started_tasks.append(task.id)
             started_tasks.append(task.id)
 
 
-        # aggregate tasks and statuses:
+        # aggregate all tasks and statuses:
         task_statuses = {}
         task_statuses = {}
         task_deps = {}
         task_deps = {}
         on_error_tasks = []
         on_error_tasks = []
@@ -1568,13 +1581,13 @@ class ConductorServerEndpoint(object):
                 on_error_tasks.append(task.id)
                 on_error_tasks.append(task.id)
 
 
         LOG.debug(
         LOG.debug(
-            "Task statuses before execution '%s' lifecycle iteration: %s",
-            execution.id, task_statuses)
+            "All task statuses before execution '%s' lifecycle iteration "
+            "(for tasks of instance '%s'): %s",
+            instance, execution.id, task_statuses)
 
 
         # NOTE: the tasks are saved in a random order in the DB, which
         # NOTE: the tasks are saved in a random order in the DB, which
         # complicates the processing logic so we just pre-sort:
         # complicates the processing logic so we just pre-sort:
-        for task in sorted(
-                execution.tasks, key=lambda t: t.index):
+        for task in sorted(tasks_to_process, key=lambda t: t.index):
 
 
             if task_statuses[task.id] == constants.TASK_STATUS_SCHEDULED:
             if task_statuses[task.id] == constants.TASK_STATUS_SCHEDULED:
 
 
@@ -1705,6 +1718,8 @@ class ConductorServerEndpoint(object):
                     ". Cleanup has been perfomed. Returning early.",
                     ". Cleanup has been perfomed. Returning early.",
                     execution.id)
                     execution.id)
                 return []
                 return []
+            LOG.debug(
+                "No new tasks were started for execution '%s'", execution.id)
 
 
         # check if execution status has changed:
         # check if execution status has changed:
         latest_execution_status = self._get_execution_status(
         latest_execution_status = self._get_execution_status(
@@ -1949,17 +1964,19 @@ class ConductorServerEndpoint(object):
                 ctxt, task, execution, updated_task_info)
                 ctxt, task, execution, updated_task_info)
 
 
             newly_started_tasks = self._advance_execution_state(
             newly_started_tasks = self._advance_execution_state(
-                ctxt, execution)
+                ctxt, execution, instance=task.instance)
             if newly_started_tasks:
             if newly_started_tasks:
                 LOG.info(
                 LOG.info(
                     "The following tasks were started for execution '%s' "
                     "The following tasks were started for execution '%s' "
-                    "following the completion of task '%s': %s" % (
-                        execution.id, task.id, newly_started_tasks))
+                    "following the completion of task '%s' for instance %s: "
+                    "%s" % (
+                        execution.id, task.id, task.instance,
+                        newly_started_tasks))
             else:
             else:
                 LOG.debug(
                 LOG.debug(
-                    "No new tasks were started for execution '%s' following "
-                    "the successful completion of task '%s'.",
-                    execution.id, task.id)
+                    "No new tasks were started for execution '%s' for instance "
+                    "'%s' following the successful completion of task '%s'.",
+                    execution.id, task.instance, task.id)
 
 
     def _cancel_execution_for_osmorphing_debugging(self, ctxt, execution):
     def _cancel_execution_for_osmorphing_debugging(self, ctxt, execution):
         # go through all scheduled tasks and cancel them:
         # go through all scheduled tasks and cancel them: