Prechádzať zdrojové kódy

Prevent conductor from double-deallocating minion machines.

When cancelling an Execution which is in 'AWAITING_MINION_ALLOCATIONS'
status, the Conductor issues an explicit deallocation call to the Minion
Manager which trips up its TaskFlow allocation flow for said minions.
Considering it is not [currently] possible to cancel taskflow tasks,
this patch makes the Conductor skip the double-deallocation if the
Execution is in 'AWAITING_MINION_ALLOCATIONS' status.
As a result, the Minion Manager will automatically clear the allocations
once its Flow is finished and the minion reporting back to the Conductor
is refused.
Nashwan Azhari 4 rokov pred
rodič
commit
5c0767bde6
1 zmenil súbory, kde vykonal 47 pridanie a 29 odobranie
  1. 47 29
      coriolis/conductor/rpc/server.py

+ 47 - 29
coriolis/conductor/rpc/server.py

@@ -669,7 +669,7 @@ class ConductorServerEndpoint(object):
                 "Started the following tasks for Execution '%s': %s",
                 execution.id, newly_started_tasks)
             self._set_tasks_execution_status(
-                ctxt, execution.id, constants.TASK_STATUS_RUNNING)
+                ctxt, execution, constants.TASK_STATUS_RUNNING)
         else:
             # NOTE: this should never happen if _check_execution_tasks_sanity
             # was called before this method:
@@ -1019,7 +1019,7 @@ class ConductorServerEndpoint(object):
             self._minion_manager_client.allocate_minion_machines_for_replica(
                 ctxt, replica)
             self._set_tasks_execution_status(
-                ctxt, execution.id,
+                ctxt, execution,
                 constants.EXECUTION_STATUS_AWAITING_MINION_ALLOCATIONS)
         else:
             self._begin_tasks(ctxt, replica, execution)
@@ -1499,7 +1499,7 @@ class ConductorServerEndpoint(object):
                     ctxt, migration, include_transfer_minions=False,
                     include_osmorphing_minions=True)
                 self._set_tasks_execution_status(
-                    ctxt, execution.id,
+                    ctxt, execution,
                     constants.EXECUTION_STATUS_AWAITING_MINION_ALLOCATIONS)
         else:
             self._begin_tasks(ctxt, migration, execution)
@@ -1651,7 +1651,7 @@ class ConductorServerEndpoint(object):
         self._cancel_tasks_execution(
             ctxt, last_replica_execution, requery=True)
         self._set_tasks_execution_status(
-            ctxt, last_replica_execution.id,
+            ctxt, last_replica_execution,
             constants.EXECUTION_STATUS_ERROR_ALLOCATING_MINIONS)
 
     @migration_synchronized
@@ -1694,7 +1694,7 @@ class ConductorServerEndpoint(object):
         self._cancel_tasks_execution(
             ctxt, execution, requery=True)
         self._set_tasks_execution_status(
-            ctxt, execution.id,
+            ctxt, execution,
             constants.EXECUTION_STATUS_ERROR_ALLOCATING_MINIONS)
 
     def migrate_instances(self, ctxt, origin_endpoint_id,
@@ -2078,7 +2078,7 @@ class ConductorServerEndpoint(object):
                     ctxt, migration, include_transfer_minions=True,
                     include_osmorphing_minions=not skip_os_morphing)
                 self._set_tasks_execution_status(
-                    ctxt, execution.id,
+                    ctxt, execution,
                     constants.EXECUTION_STATUS_AWAITING_MINION_ALLOCATIONS)
         else:
             self._begin_tasks(ctxt, migration, execution)
@@ -2143,7 +2143,7 @@ class ConductorServerEndpoint(object):
                 "cancellation is '%s'", execution.id, execution.status)
             # mark execution as cancelling:
             self._set_tasks_execution_status(
-                ctxt, execution.id, constants.EXECUTION_STATUS_CANCELLING)
+                ctxt, execution, constants.EXECUTION_STATUS_CANCELLING)
         elif execution.status == constants.EXECUTION_STATUS_CANCELLING and (
                 not force):
             LOG.info(
@@ -2296,31 +2296,49 @@ class ConductorServerEndpoint(object):
                 "No new tasks were started for execution '%s' following "
                 "state advancement after cancellation.", execution.id)
 
-    def _set_tasks_execution_status(self, ctxt, execution_id, execution_status):
+    def _set_tasks_execution_status(
+            self, ctxt, execution, new_execution_status):
+        previous_execution_status = execution.status
         execution = db_api.set_execution_status(
-            ctxt, execution_id, execution_status)
+            ctxt, execution.id, new_execution_status)
         LOG.info(
             "Tasks execution %(id)s (action %(action)s) status updated "
-            "to: %(status)s",
-            {"id": execution_id, "status": execution_status,
-             "action": execution.action_id})
-
-        if execution_status in constants.FINALIZED_EXECUTION_STATUSES:
-            LOG.debug(
-                "Attempting to deallocate minion machines for finalized "
-                "Execution '%s' of type '%s' (action '%s') following "
-                "its transition into finalized status '%s'",
-                execution.id, execution.type, execution.action_id,
-                execution_status)
-            action = db_api.get_action(ctxt, execution.action_id)
-            self._deallocate_minion_machines_for_action(
-                ctxt, action)
+            "from %(old_status)s to %(new_status)s",
+            {"id": execution.id, "new_status": new_execution_status,
+             "action": execution.action_id,
+             "old_status": previous_execution_status})
+
+        if new_execution_status in constants.FINALIZED_EXECUTION_STATUSES:
+            # NOTE(aznashwan): because the taskflow flows within the minion
+            # manager cannot [currently] be cancelled and are destined to
+            # run to completion, we prevent explicitly requesting their
+            # deallocation. As a result, the machines will be freed once
+            # the minion manager attempts to confirm their allocation
+            # (which the conductor shall refuse)
+            if previous_execution_status == (
+                    constants.EXECUTION_STATUS_AWAITING_MINION_ALLOCATIONS):
+                LOG.warn(
+                    "Execution '%s' was in '%s' status at time of transition "
+                    "to '%s' status. NOT requesting minion machine "
+                    "deallocation from the minion manager." % (
+                        execution.id, previous_execution_status,
+                        new_execution_status))
+            else:
+                LOG.debug(
+                    "Attempting to deallocate minion machines for finalized "
+                    "Execution '%s' of type '%s' (action '%s') following "
+                    "its transition into finalized status '%s'",
+                    execution.id, execution.type, execution.action_id,
+                    new_execution_status)
+                action = db_api.get_action(ctxt, execution.action_id)
+                self._deallocate_minion_machines_for_action(ctxt, action)
 
             if ctxt.delete_trust_id:
                 LOG.debug(
                     "Deleting Keystone trust following status change "
-                    "for Execution '%s' (action '%s') to '%s'",
-                    execution_id, execution.action_id, execution_status)
+                    "for Execution '%s' (action '%s') from '%s' to '%s'",
+                    execution.id, execution.action_id,
+                    previous_execution_status, new_execution_status)
                 keystone.delete_trust(ctxt)
         else:
             LOG.debug(
@@ -2328,7 +2346,7 @@ class ConductorServerEndpoint(object):
                 "of type '%s' (action '%s') yet as it is still in an "
                 "active Execution status (%s)",
                 execution.id, execution.type, execution.action_id,
-                execution_status)
+                new_execution_status)
 
     @parent_tasks_execution_synchronized
     def set_task_host(self, ctxt, task_id, host):
@@ -2438,7 +2456,7 @@ class ConductorServerEndpoint(object):
             LOG.warn(
                 "Marking deadlocked execution '%s' as DEADLOCKED", execution.id)
             self._set_tasks_execution_status(
-                ctxt, execution.id, constants.EXECUTION_STATUS_DEADLOCKED)
+                ctxt, execution, constants.EXECUTION_STATUS_DEADLOCKED)
             LOG.error(
                 "Execution '%s' is deadlocked. Cleanup has been performed. "
                 "Task statuses at time of deadlock were: %s",
@@ -2785,7 +2803,7 @@ class ConductorServerEndpoint(object):
                 execution.id, execution.status,
                 latest_execution_status, task_statuses)
             self._set_tasks_execution_status(
-                ctxt, execution.id, latest_execution_status)
+                ctxt, execution, latest_execution_status)
         else:
             LOG.debug(
                 "Execution '%s' has remained in status '%s' following "
@@ -3354,7 +3372,7 @@ class ConductorServerEndpoint(object):
                         action_id, action.info.get(task.instance, {}).get(
                             'osmorphing_connection_info', {}))
                     self._set_tasks_execution_status(
-                        ctxt, execution.id,
+                        ctxt, execution,
                         constants.EXECUTION_STATUS_CANCELED_FOR_DEBUGGING)
                 else:
                     LOG.warn(