Bläddra i källkod

Merge pull request #206 from aznashwan/minion-machine-cancellation

Prevent double-deallocation of minion machines for unstarted Executions.
Nashwan Azhari 4 år sedan
förälder
incheckning
edcd897f05

+ 59 - 38
coriolis/conductor/rpc/server.py

@@ -561,8 +561,11 @@ class ConductorServerEndpoint(object):
             # scheduled tasks count as scheduled:
             elif depends_on:
                 for task_id in depends_on:
-                    if [t for t in task.execution.tasks if t.id == task_id and
-                            t.status != constants.TASK_STATUS_ON_ERROR_ONLY]:
+                    if [t
+                        for t in task.execution.tasks
+                        if t.id == task_id and (
+                            (t.status != (
+                                constants.TASK_STATUS_ON_ERROR_ONLY)))]:
                         task.status = constants.TASK_STATUS_SCHEDULED
                         break
             # on_error tasks with no deps are automatically scheduled:
@@ -631,8 +634,8 @@ class ConductorServerEndpoint(object):
 
         newly_started_tasks = []
         for task in execution.tasks:
-            if (not task.depends_on and
-                    task.status == constants.TASK_STATUS_SCHEDULED):
+            if (not task.depends_on and (
+                    task.status == constants.TASK_STATUS_SCHEDULED)):
                 LOG.info(
                     "Starting dependency-less task '%s' for execution '%s'",
                     task.id, execution.id)
@@ -666,7 +669,7 @@ class ConductorServerEndpoint(object):
                 "Started the following tasks for Execution '%s': %s",
                 execution.id, newly_started_tasks)
             self._set_tasks_execution_status(
-                ctxt, execution.id, constants.TASK_STATUS_RUNNING)
+                ctxt, execution, constants.TASK_STATUS_RUNNING)
         else:
             # NOTE: this should never happen if _check_execution_tasks_sanity
             # was called before this method:
@@ -1016,7 +1019,7 @@ class ConductorServerEndpoint(object):
             self._minion_manager_client.allocate_minion_machines_for_replica(
                 ctxt, replica)
             self._set_tasks_execution_status(
-                ctxt, execution.id,
+                ctxt, execution,
                 constants.EXECUTION_STATUS_AWAITING_MINION_ALLOCATIONS)
         else:
             self._begin_tasks(ctxt, replica, execution)
@@ -1098,8 +1101,8 @@ class ConductorServerEndpoint(object):
 
         has_tasks = False
         for instance in replica.instances:
-            if (instance in replica.info and
-                    replica.info[instance].get('volumes_info')):
+            if (instance in replica.info and (
+                    replica.info[instance].get('volumes_info'))):
                 source_del_task = self._create_task(
                     instance,
                     constants.TASK_TYPE_DELETE_REPLICA_SOURCE_DISK_SNAPSHOTS,
@@ -1145,8 +1148,8 @@ class ConductorServerEndpoint(object):
                 "the same platform (ex: migrating across public cloud regions)"
                 ", please create two separate endpoints.")
         # TODO(alexpilotti): check Barbican secrets content as well
-        if (origin_endpoint.connection_info ==
-                destination_endpoint.connection_info):
+        if (origin_endpoint.connection_info == (
+                destination_endpoint.connection_info)):
             raise exception.SameDestination()
 
     def create_instances_replica(self, ctxt, origin_endpoint_id,
@@ -1496,7 +1499,7 @@ class ConductorServerEndpoint(object):
                     ctxt, migration, include_transfer_minions=False,
                     include_osmorphing_minions=True)
                 self._set_tasks_execution_status(
-                    ctxt, execution.id,
+                    ctxt, execution,
                     constants.EXECUTION_STATUS_AWAITING_MINION_ALLOCATIONS)
         else:
             self._begin_tasks(ctxt, migration, execution)
@@ -1648,7 +1651,7 @@ class ConductorServerEndpoint(object):
         self._cancel_tasks_execution(
             ctxt, last_replica_execution, requery=True)
         self._set_tasks_execution_status(
-            ctxt, last_replica_execution.id,
+            ctxt, last_replica_execution,
             constants.EXECUTION_STATUS_ERROR_ALLOCATING_MINIONS)
 
     @migration_synchronized
@@ -1678,7 +1681,7 @@ class ConductorServerEndpoint(object):
             constants.EXECUTION_STATUS_AWAITING_MINION_ALLOCATIONS)
         if migration.last_execution_status != awaiting_minions_status:
             raise exception.InvalidReplicaState(
-                "Machine is in '%s' status instead of the expected '%s' to "
+                "Migration is in '%s' status instead of the expected '%s' to "
                 "have minion machines allocations fail for it." % (
                     migration.last_execution_status, awaiting_minions_status))
 
@@ -1691,7 +1694,7 @@ class ConductorServerEndpoint(object):
         self._cancel_tasks_execution(
             ctxt, execution, requery=True)
         self._set_tasks_execution_status(
-            ctxt, execution.id,
+            ctxt, execution,
             constants.EXECUTION_STATUS_ERROR_ALLOCATING_MINIONS)
 
     def migrate_instances(self, ctxt, origin_endpoint_id,
@@ -2075,7 +2078,7 @@ class ConductorServerEndpoint(object):
                     ctxt, migration, include_transfer_minions=True,
                     include_osmorphing_minions=not skip_os_morphing)
                 self._set_tasks_execution_status(
-                    ctxt, execution.id,
+                    ctxt, execution,
                     constants.EXECUTION_STATUS_AWAITING_MINION_ALLOCATIONS)
         else:
             self._begin_tasks(ctxt, migration, execution)
@@ -2140,7 +2143,7 @@ class ConductorServerEndpoint(object):
                 "cancellation is '%s'", execution.id, execution.status)
             # mark execution as cancelling:
             self._set_tasks_execution_status(
-                ctxt, execution.id, constants.EXECUTION_STATUS_CANCELLING)
+                ctxt, execution, constants.EXECUTION_STATUS_CANCELLING)
         elif execution.status == constants.EXECUTION_STATUS_CANCELLING and (
                 not force):
             LOG.info(
@@ -2293,31 +2296,49 @@ class ConductorServerEndpoint(object):
                 "No new tasks were started for execution '%s' following "
                 "state advancement after cancellation.", execution.id)
 
-    def _set_tasks_execution_status(self, ctxt, execution_id, execution_status):
+    def _set_tasks_execution_status(
+            self, ctxt, execution, new_execution_status):
+        previous_execution_status = execution.status
         execution = db_api.set_execution_status(
-            ctxt, execution_id, execution_status)
+            ctxt, execution.id, new_execution_status)
         LOG.info(
             "Tasks execution %(id)s (action %(action)s) status updated "
-            "to: %(status)s",
-            {"id": execution_id, "status": execution_status,
-             "action": execution.action_id})
-
-        if execution_status in constants.FINALIZED_EXECUTION_STATUSES:
-            LOG.debug(
-                "Attempting to deallocate minion machines for finalized "
-                "Execution '%s' of type '%s' (action '%s') following "
-                "its transition into finalized status '%s'",
-                execution.id, execution.type, execution.action_id,
-                execution_status)
-            action = db_api.get_action(ctxt, execution.action_id)
-            self._deallocate_minion_machines_for_action(
-                ctxt, action)
+            "from %(old_status)s to %(new_status)s",
+            {"id": execution.id, "new_status": new_execution_status,
+             "action": execution.action_id,
+             "old_status": previous_execution_status})
+
+        if new_execution_status in constants.FINALIZED_EXECUTION_STATUSES:
+            # NOTE(aznashwan): because the taskflow flows within the minion
+            # manager cannot [currently] be cancelled and are destined to
+            # run to completion, we prevent explicitly requesting their
+            # deallocation. As a result, the machines will be freed once
+            # the minion manager attempts to confirm their allocation
+            # (which the conductor shall refuse)
+            if previous_execution_status == (
+                    constants.EXECUTION_STATUS_AWAITING_MINION_ALLOCATIONS):
+                LOG.warn(
+                    "Execution '%s' was in '%s' status at time of transition "
+                    "to '%s' status. NOT requesting minion machine "
+                    "deallocation from the minion manager." % (
+                        execution.id, previous_execution_status,
+                        new_execution_status))
+            else:
+                LOG.debug(
+                    "Attempting to deallocate minion machines for finalized "
+                    "Execution '%s' of type '%s' (action '%s') following "
+                    "its transition into finalized status '%s'",
+                    execution.id, execution.type, execution.action_id,
+                    new_execution_status)
+                action = db_api.get_action(ctxt, execution.action_id)
+                self._deallocate_minion_machines_for_action(ctxt, action)
 
             if ctxt.delete_trust_id:
                 LOG.debug(
                     "Deleting Keystone trust following status change "
-                    "for Execution '%s' (action '%s') to '%s'",
-                    execution_id, execution.action_id, execution_status)
+                    "for Execution '%s' (action '%s') from '%s' to '%s'",
+                    execution.id, execution.action_id,
+                    previous_execution_status, new_execution_status)
                 keystone.delete_trust(ctxt)
         else:
             LOG.debug(
@@ -2325,7 +2346,7 @@ class ConductorServerEndpoint(object):
                 "of type '%s' (action '%s') yet as it is still in an "
                 "active Execution status (%s)",
                 execution.id, execution.type, execution.action_id,
-                execution_status)
+                new_execution_status)
 
     @parent_tasks_execution_synchronized
     def set_task_host(self, ctxt, task_id, host):
@@ -2435,7 +2456,7 @@ class ConductorServerEndpoint(object):
             LOG.warn(
                 "Marking deadlocked execution '%s' as DEADLOCKED", execution.id)
             self._set_tasks_execution_status(
-                ctxt, execution.id, constants.EXECUTION_STATUS_DEADLOCKED)
+                ctxt, execution, constants.EXECUTION_STATUS_DEADLOCKED)
             LOG.error(
                 "Execution '%s' is deadlocked. Cleanup has been performed. "
                 "Task statuses at time of deadlock were: %s",
@@ -2782,7 +2803,7 @@ class ConductorServerEndpoint(object):
                 execution.id, execution.status,
                 latest_execution_status, task_statuses)
             self._set_tasks_execution_status(
-                ctxt, execution.id, latest_execution_status)
+                ctxt, execution, latest_execution_status)
         else:
             LOG.debug(
                 "Execution '%s' has remained in status '%s' following "
@@ -3351,7 +3372,7 @@ class ConductorServerEndpoint(object):
                         action_id, action.info.get(task.instance, {}).get(
                             'osmorphing_connection_info', {}))
                     self._set_tasks_execution_status(
-                        ctxt, execution.id,
+                        ctxt, execution,
                         constants.EXECUTION_STATUS_CANCELED_FOR_DEBUGGING)
                 else:
                     LOG.warn(

+ 1 - 1
coriolis/minion_manager/rpc/server.py

@@ -1072,7 +1072,7 @@ class MinionManagerServerEndpoint(object):
                     minion_machine.id, minion_machine.allocation_status,
                     minion_machine.allocated_action)
             LOG.debug(
-                "Attempting to deallocate all minion pool machine '%s' "
+                "Attempting to deallocate minion pool machine '%s' "
                 "(currently allocated to action '%s' with status '%s')",
                 minion_machine.id, minion_machine.allocated_action,
                 minion_machine.allocation_status)

+ 60 - 16
coriolis/minion_manager/rpc/tasks.py

@@ -1,5 +1,6 @@
 # Copyright 2020 Cloudbase Solutions Srl
 # All Rights Reserved.
+# pylint: disable=line-too-long
 
 import abc
 import copy
@@ -227,6 +228,11 @@ class _BaseConfirmMinionAllocationForActionTask(
         super(_BaseConfirmMinionAllocationForActionTask, self).__init__(
             name=self._task_name, **kwargs)
 
+    @abc.abstractmethod
+    def _get_action_label(self):
+        raise NotImplementedError(
+            "No minion allocation confirmation task action label defined")
+
     @abc.abstractmethod
     def _get_task_name(self, action_id):
         raise NotImplementedError(
@@ -260,7 +266,8 @@ class _BaseConfirmMinionAllocationForActionTask(
                 raise exception.InvalidMinionMachineState(
                     "Minion machine with ID '%s' of pool '%s' appears to be "
                     "allocated to action with ID '%s' instead of the expected"
-                    " '%s' for it to be used as a '%s' minion for instance '%s'." % (
+                    " '%s' for it to be used as a '%s' minion for instance "
+                    "'%s'." % (
                         minion_machine.id, minion_machine.pool_id,
                         minion_machine.allocated_action, self._action_id,
                         minion_purpose, instance))
@@ -312,7 +319,8 @@ class _BaseConfirmMinionAllocationForActionTask(
                         context, origin_minion_id, raise_if_not_found=True)
                     machines_cache[origin_minion_id] = origin_minion_machine
                     _check_minion_properties(
-                        origin_minion_machine, instance, minion_purpose="source")
+                        origin_minion_machine, instance,
+                        minion_purpose="source")
                 machine_allocations[instance]['origin_minion'] = (
                     origin_minion_machine.to_dict())
 
@@ -351,13 +359,47 @@ class _BaseConfirmMinionAllocationForActionTask(
                 machine_allocations[instance]['osmorphing_minion'] = (
                     osmorphing_minion_machine.to_dict())
 
-        self._confirm_machine_allocation_for_action(
-            context, self._action_id, machine_allocations)
+        try:
+            self._confirm_machine_allocation_for_action(
+                context, self._action_id, machine_allocations)
+        except exception.NotFound as ex:
+            msg = (
+                "The Conductor has refused minion machine allocations for "
+                "%s with ID '%s' as it has purportedly been deleted."
+                " Please check both the Conductor and Minion Manager "
+                "service logs for more details." % (
+                    self._get_action_label().lower().capitalize(),
+                    self._action_id))
+            LOG.error(
+                "%s. Allocations were: %s. Original trace was: %s",
+                msg, machine_allocations, utils.get_exception_details())
+            raise exception.MinionMachineAllocationFailure(
+                msg) from ex
+        except (
+                exception.InvalidMigrationState,
+                exception.InvalidReplicaState) as ex:
+            msg = (
+                "The Conductor has refused minion machine allocations for "
+                "%s with ID '%s' as it is purportedly in an invalid state "
+                "to have minions allocated for it. It is possible "
+                "that the transfer had been user-cancelled or had "
+                "otherwise been halted. Please check both the Conductor "
+                "and Minion Manager service logs for more details." % (
+                    self._get_action_label().lower().capitalize(),
+                    self._action_id))
+            LOG.error(
+                "%s. Allocations were: %s. Original trace was: %s",
+                msg, machine_allocations, utils.get_exception_details())
+            raise exception.MinionMachineAllocationFailure(
+                msg) from ex
 
 
 class ConfirmMinionAllocationForMigrationTask(
         _BaseConfirmMinionAllocationForActionTask):
 
+    def _get_action_label(self):
+        return "migration"
+
     def _get_task_name(self, action_id):
         return MINION_POOL_CONFIRM_MIGRATION_MINION_ALLOCATION_TASK_NAME_FORMAT % (
             action_id)
@@ -371,6 +413,9 @@ class ConfirmMinionAllocationForMigrationTask(
 class ConfirmMinionAllocationForReplicaTask(
         _BaseConfirmMinionAllocationForActionTask):
 
+    def _get_action_label(self):
+        return "replica"
+
     def _get_task_name(self, action_id):
         return MINION_POOL_CONFIRM_REPLICA_MINION_ALLOCATION_TASK_NAME_FORMAT % (
             action_id)
@@ -553,7 +598,7 @@ class ValidateMinionPoolOptionsTask(BaseMinionManangerTask):
     def execute(self, context, origin, destination, task_info):
         self._add_minion_pool_event(
             context, "Validating minion pool options")
-        res = super(ValidateMinionPoolOptionsTask, self).execute(
+        _ = super(ValidateMinionPoolOptionsTask, self).execute(
             context, origin, destination, task_info)
         self._add_minion_pool_event(
             context, "Successfully validated minion pool options")
@@ -789,7 +834,7 @@ class AllocateMinionMachineTask(BaseMinionManangerTask):
         try:
             res = super(AllocateMinionMachineTask, self).execute(
                 context, origin, destination, execution_info)
-        except:
+        except Exception:
             self._set_minion_machine_allocation_status(
                 context, self._minion_pool_id, self._minion_machine_id,
                 constants.MINION_MACHINE_STATUS_ERROR_DEPLOYING)
@@ -847,9 +892,8 @@ class AllocateMinionMachineTask(BaseMinionManangerTask):
             LOG.warn(
                 "[Task '%s'] Allocation task reversion for machine '%s' "
                 "of pool '%s' got an unexpected task result type (%s): %s",
-                    self._task_name, self._minion_machine_id,
-                    self._minion_pool_id, type(original_result),
-                    original_result)
+                self._task_name, self._minion_machine_id,
+                self._minion_pool_id, type(original_result), original_result)
             minion_provider_properties = None
 
         # default to any minion properties found in the task_info:
@@ -882,7 +926,7 @@ class AllocateMinionMachineTask(BaseMinionManangerTask):
                 if not minion_provider_properties and (
                         machine_db_entry.provider_properties):
                     minion_provider_properties = (
-                         machine_db_entry.provider_properties)
+                        machine_db_entry.provider_properties)
                     LOG.debug(
                         "[Task '%s'] Using minion provider properties of "
                         "minion machine with ID '%s' from DB entry during the "
@@ -895,7 +939,7 @@ class AllocateMinionMachineTask(BaseMinionManangerTask):
                 self._task_name, self._minion_machine_id)
             try:
                 db_api.delete_minion_machine(context, self._minion_machine_id)
-            except Exception as ex:
+            except Exception:
                 LOG.warn(
                     "[Task '%s'] Failed to delete DB entry for minion machine "
                     "'%s' following reversion of its allocation task. Error "
@@ -917,7 +961,7 @@ class AllocateMinionMachineTask(BaseMinionManangerTask):
             try:
                 super(AllocateMinionMachineTask, self).revert(
                     context, origin, destination, cleanup_info, **kwargs)
-            except Exception as ex:
+            except Exception:
                 log_msg = (
                     "[Task '%s'] Exception occurred while attempting to revert "
                     "deployment of minion machine with ID '%s' for pool '%s'." % (
@@ -976,7 +1020,7 @@ class DeallocateMinionMachineTask(BaseMinionManangerTask):
             try:
                 _ = super(DeallocateMinionMachineTask, self).execute(
                     context, origin, destination, execution_info)
-            except Exception as ex:
+            except Exception:
                 base_msg = (
                     "Exception occured while deallocating minion machine '%s' "
                     "There might be leftover instance resources requiring "
@@ -1145,7 +1189,7 @@ class MinionMachineHealtchcheckDecider(object):
         if not history and healthcheck_task_name not in history:
             LOG.warn(
                 "Could not find healthceck result for minion machine '%s' "
-                "of pool '%s' (task name '%s'). NOT grennlighting futher "
+                "of pool '%s' (task name '%s'). NOT greenlighting futher "
                 "tasks.", self._minion_machine_id, self._minion_pool_id,
                 healthcheck_task_name)
             return False
@@ -1223,7 +1267,7 @@ class PowerOnMinionMachineTask(BaseMinionManangerTask):
                 context,
                 "Successfully powered on minion machine with internal pool "
                 "ID '%s'" % self._minion_machine_id)
-        except Exception as ex:
+        except Exception:
             base_msg = (
                 "[Task '%s'] Exception occurred while powering on minion "
                 "machine with ID '%s' of pool '%s'." % (
@@ -1303,7 +1347,7 @@ class PowerOffMinionMachineTask(BaseMinionManangerTask):
                 context,
                 "Successfully powered off minion machine with internal pool "
                 "ID '%s'" % self._minion_machine_id)
-        except Exception as ex:
+        except Exception:
             base_msg = (
                 "[Task '%s'] Exception occurred while powering off minion "
                 "machine with ID '%s' of pool '%s'." % (