Răsfoiți Sursa

Add minion machine shutdown retention functionality.

Nashwan Azhari 5 ani în urmă
părinte
comite
677067ac75

+ 23 - 11
coriolis/constants.py

@@ -177,18 +177,29 @@ TASK_TYPE_RELEASE_OSMORPHING_MINION = "RELEASE_OSMORPHING_MINION"
 TASK_TYPE_COLLECT_OSMORPHING_INFO = "COLLECT_OS_MORPHING_INFO"
 TASK_TYPE_HEALTHCHECK_SOURCE_MINION = "HEALTHCHECK_SOURCE_MINION"
 TASK_TYPE_HEALTHCHECK_DESTINATION_MINION = "HEALTHCHECK_DESTINATION_MINION"
+TASK_TYPE_POWER_ON_SOURCE_MINION = "POWER_ON_SOURCE_MINION"
+TASK_TYPE_POWER_OFF_SOURCE_MINION = "POWER_OFF_SOURCE_MINION"
+TASK_TYPE_POWER_ON_DESTINATION_MINION = "POWER_ON_DESTINATION_MINION"
+TASK_TYPE_POWER_OFF_DESTINATION_MINION = "POWER_OFF_DESTINATION_MINION"
+
 
 MINION_POOL_OPERATIONS_TASKS = [
     TASK_TYPE_VALIDATE_SOURCE_MINION_POOL_OPTIONS,
     TASK_TYPE_VALIDATE_DESTINATION_MINION_POOL_OPTIONS,
     TASK_TYPE_SET_UP_SOURCE_POOL_SHARED_RESOURCES,
+    TASK_TYPE_TEAR_DOWN_SOURCE_POOL_SHARED_RESOURCES,
     TASK_TYPE_SET_UP_DESTINATION_POOL_SHARED_RESOURCES,
+    TASK_TYPE_TEAR_DOWN_DESTINATION_POOL_SHARED_RESOURCES,
     TASK_TYPE_CREATE_SOURCE_MINION_MACHINE,
+    TASK_TYPE_DELETE_SOURCE_MINION_MACHINE,
     TASK_TYPE_CREATE_DESTINATION_MINION_MACHINE,
-    TASK_TYPE_TEAR_DOWN_SOURCE_POOL_SHARED_RESOURCES,
-    TASK_TYPE_TEAR_DOWN_DESTINATION_POOL_SHARED_RESOURCES,
+    TASK_TYPE_DELETE_DESTINATION_MINION_MACHINE,
     TASK_TYPE_HEALTHCHECK_SOURCE_MINION,
-    TASK_TYPE_HEALTHCHECK_DESTINATION_MINION
+    TASK_TYPE_HEALTHCHECK_DESTINATION_MINION,
+    TASK_TYPE_POWER_ON_SOURCE_MINION,
+    TASK_TYPE_POWER_OFF_SOURCE_MINION,
+    TASK_TYPE_POWER_ON_DESTINATION_MINION,
+    TASK_TYPE_POWER_OFF_DESTINATION_MINION
 ]
 
 TASK_PLATFORM_SOURCE = "source"
@@ -288,14 +299,6 @@ EXECUTION_TYPE_MINION_POOL_ALLOCATE_MINIONS = "minion_pool_allocate_minions"
 EXECUTION_TYPE_MINION_POOL_DEALLOCATE_MINIONS = (
     "minion_pool_deallocate_minions")
 
-MINION_POOL_EXECUTION_TYPES = [
-    EXECUTION_TYPE_MINION_POOL_MAINTENANCE,
-    EXECUTION_TYPE_MINION_POOL_UPDATE,
-    EXECUTION_TYPE_MINION_POOL_SET_UP_SHARED_RESOURCES,
-    EXECUTION_TYPE_MINION_POOL_TEAR_DOWN_SHARED_RESOURCES,
-    EXECUTION_TYPE_MINION_POOL_ALLOCATE_MINIONS,
-    EXECUTION_TYPE_MINION_POOL_DEALLOCATE_MINIONS]
-
 TASK_LOCK_NAME_FORMAT = "task-%s"
 TASKFLOW_LOCK_NAME_FORMAT = "taskflow-%s"
 EXECUTION_LOCK_NAME_FORMAT = "execution-%s"
@@ -363,7 +366,16 @@ MINION_MACHINE_STATUS_HEALTHCHECKING = "HEALTHCHECKING"
 MINION_MACHINE_STATUS_ALLOCATING = "ALLOCATING"
 MINION_MACHINE_STATUS_DEALLOCATING = "DEALLOCATING"
 MINION_MACHINE_STATUS_ERROR = "ERROR"
+MINION_MACHINE_STATUS_POWERING_OFF = "POWERING_OFF"
+MINION_MACHINE_STATUS_POWER_ERROR = "POWER_ERROR"
 MINION_MACHINE_STATUS_ERROR_DEPLOYING = "ERROR_DEPLOYING"
 MINION_MACHINE_STATUS_AVAILABLE = "AVAILABLE"
 MINION_MACHINE_STATUS_IN_USE = "IN_USE"
 MINION_MACHINE_STATUS_RESERVED = "RESERVED"
+
+MINION_MACHINE_POWER_STATUS_UNKNOWN = "UNKNOWN"
+MINION_MACHINE_POWER_STATUS_UNINITIALIZED = "UNINITIALIZED"
+MINION_MACHINE_POWER_STATUS_POWERED_ON = "POWERED_ON"
+MINION_MACHINE_POWER_STATUS_POWERED_OFF = "POWERED_OFF"
+MINION_MACHINE_POWER_STATUS_POWERING_ON = "POWERING_ON"
+MINION_MACHINE_POWER_STATUS_POWERING_OFF = "POWERING_OFF"

+ 14 - 7
coriolis/db/api.py

@@ -1188,6 +1188,13 @@ def get_mapped_services_for_region(context, region_id):
 def add_minion_machine(context, minion_machine):
     minion_machine.user_id = context.user
     minion_machine.project_id = context.tenant
+    # inherit pool user/tenant if none are given:
+    if None in [minion_machine.user_id, minion_machine.project_id]:
+        pool = get_minion_pool(context, minion_machine.pool_id)
+        if not minion_machine.user_id:
+            minion_machine.user_id = pool.user_id
+        if not minion_machine.project_id:
+            minion_machine.project_id = pool.project_id
     _session(context).add(minion_machine)
 
 
@@ -1218,15 +1225,15 @@ def update_minion_machine(context, minion_machine_id, updated_values):
             "MinionMachine with ID '%s' does not exist." % minion_machine_id)
 
     updateable_fields = [
-        "connection_info", "provider_properties", "status",
+        "connection_info", "provider_properties", "allocation_status",
         "backup_writer_connection_info", "allocated_action",
-        "last_used_at"]
+        "last_used_at", "power_status"]
     _update_sqlalchemy_object_fields(
         minion_machine, updateable_fields, updated_values)
 
 
 @enginefacade.writer
-def set_minion_machine_status(context, minion_machine_id, status):
+def set_minion_machine_allocation_status(context, minion_machine_id, status):
     machine = get_minion_machine(context, minion_machine_id)
     if not machine:
         raise exception.NotFound(
@@ -1234,8 +1241,8 @@ def set_minion_machine_status(context, minion_machine_id, status):
     LOG.debug(
         "Transitioning minion machine '%s' (pool '%s') from status '%s' to "
         "'%s' in the DB",
-        minion_machine_id, machine.pool_id, machine.status, status)
-    machine.status = status
+        minion_machine_id, machine.pool_id, machine.allocation_status, status)
+    machine.allocation_status = status
     setattr(machine, 'updated_at', timeutils.utcnow())
 
 
@@ -1259,12 +1266,12 @@ def set_minion_machines_allocation_statuses(
         LOG.debug(
             "Changing allocation status in DB for minion machine '%s' "
             "from '%s' to '%s' and allocated action from '%s' to '%s'" % (
-                machine.id, machine.status, allocation_status,
+                machine.id, machine.allocation_status, allocation_status,
                 machine.allocated_action, action_id))
         machine.allocated_action = action_id
         if refresh_allocation_time:
             machine.last_used_at = timeutils.utcnow()
-        machine.status = allocation_status
+        machine.allocation_status = allocation_status
 
 
 @enginefacade.writer

+ 5 - 2
coriolis/db/sqlalchemy/migrate_repo/versions/016_adds_minion_vm_pools.py

@@ -91,8 +91,11 @@ def upgrade(migrate_engine):
             sqlalchemy.Column(
                 'last_used_at', sqlalchemy.DateTime, nullable=True),
             sqlalchemy.Column(
-                'status', sqlalchemy.String(255), nullable=False,
-                default=lambda: "UNKNOWN"),
+                'allocation_status', sqlalchemy.String(255), nullable=False,
+                default=lambda: "UNINITIALIZED"),
+            sqlalchemy.Column(
+                'power_status', sqlalchemy.String(255), nullable=False,
+                default=lambda: "UNINITIALIZED"),
             sqlalchemy.Column('connection_info', sqlalchemy.Text),
             sqlalchemy.Column(
                 'backup_writer_connection_info', sqlalchemy.Text,

+ 6 - 2
coriolis/db/sqlalchemy/models.py

@@ -484,13 +484,16 @@ class MinionMachine(BASE, models.TimestampMixin, models.ModelBase,
         sqlalchemy.ForeignKey('minion_pool.id'),
         nullable=False)
 
-    status = sqlalchemy.Column(
+    allocation_status = sqlalchemy.Column(
         sqlalchemy.String(255), nullable=False,
         default=lambda: constants.MINION_MACHINE_STATUS_UNINITIALIZED)
 
     allocated_action = sqlalchemy.Column(
         sqlalchemy.String(36), nullable=True)
 
+    power_status = sqlalchemy.Column(
+        sqlalchemy.String(255), nullable=False)
+
     last_used_at = sqlalchemy.Column(
         sqlalchemy.types.DateTime, nullable=True)
 
@@ -513,7 +516,8 @@ class MinionMachine(BASE, models.TimestampMixin, models.ModelBase,
             "deleted_at": self.deleted_at,
             "deleted": self.deleted,
             "pool_id": self.pool_id,
-            "status": self.status,
+            "allocation_status": self.allocation_status,
+            "power_status": self.power_status,
             "connection_info": self.connection_info,
             "allocated_action": self.allocated_action,
             "last_used_at": self.last_used_at,

+ 157 - 58
coriolis/minion_manager/rpc/server.py

@@ -27,6 +27,7 @@ from coriolis.minion_manager.rpc import tasks as minion_manager_tasks
 from coriolis.minion_manager.rpc import utils as minion_manager_utils
 from coriolis.scheduler.rpc import client as rpc_scheduler_client
 from coriolis.taskflow import runner as taskflow_runner
+from coriolis.taskflow import utils as taskflow_utils
 from coriolis.worker.rpc import client as rpc_worker_client
 
 
@@ -533,7 +534,7 @@ class MinionManagerServerEndpoint(object):
         """
         currently_available_machines = [
             machine for machine in minion_pool.minion_machines
-            if machine.status == constants.MINION_MACHINE_STATUS_AVAILABLE]
+            if machine.allocation_status == constants.MINION_MACHINE_STATUS_AVAILABLE]
         extra_available_machine_slots = (
             minion_pool.maximum_minions - len(minion_pool.minion_machines))
         num_instances = len(action_instances)
@@ -557,19 +558,19 @@ class MinionManagerServerEndpoint(object):
             # NOTE(aznashwan): this will iterate through machines in a set
             # order every time, thus ensuring that some are preferred over
             # others and facilitating some to be left unused and thus torn
-            # during the periodic refreshes:
+            # down during the periodic refreshes:
             for machine in minion_pool.minion_machines:
                 if exclude and machine.id in exclude:
                     LOG.debug(
                         "Excluding minion machine '%s' from search for use "
                         "action '%s'", machine.id, action_id)
                     continue
-                if machine.status != constants.MINION_MACHINE_STATUS_AVAILABLE:
+                if machine.allocation_status != constants.MINION_MACHINE_STATUS_AVAILABLE:
                     LOG.debug(
                         "Minion machine with ID '%s' is in status '%s' "
                         "instead of the expected '%s'. Skipping for use "
                         "with action '%s'.",
-                        machine.id, machine.status,
+                        machine.id, machine.allocation_status,
                         constants.MINION_MACHINE_STATUS_AVAILABLE, action_id)
                     continue
                 selected_machine = machine
@@ -600,8 +601,9 @@ class MinionManagerServerEndpoint(object):
                     minion_machine.id, minion_pool.id, action_id)
                 allocation_subflow.add(
                     self._get_healtchcheck_flow_for_minion_machine(
-                        minion_pool, minion_machine.id,
+                        minion_pool, minion_machine,
                         allocate_to_action=action_id,
+                        power_on_machine=True,
                         inject_for_tasks=inject_for_tasks,
                         machine_status_on_success=(
                             constants.MINION_MACHINE_STATUS_IN_USE)))
@@ -616,8 +618,10 @@ class MinionManagerServerEndpoint(object):
                 new_minion_machine = models.MinionMachine()
                 new_minion_machine.id = new_machine_id
                 new_minion_machine.pool_id = minion_pool.id
-                new_minion_machine.status = (
+                new_minion_machine.allocation_status = (
                     constants.MINION_MACHINE_STATUS_UNINITIALIZED)
+                new_minion_machine.power_status = (
+                    constants.MINION_MACHINE_POWER_STATUS_UNINITIALIZED)
                 new_minion_machine.allocated_action = action_id
                 machine_db_entries_to_add.append(new_minion_machine)
 
@@ -639,8 +643,13 @@ class MinionManagerServerEndpoint(object):
                 minion_pool.id, action_id, existing_machines_to_allocate)
             db_api.set_minion_machines_allocation_statuses(
                 ctxt, list(existing_machines_to_allocate.keys()),
-                action_id, constants.MINION_MACHINE_STATUS_IN_USE,
+                action_id, constants.MINION_MACHINE_STATUS_RESERVED,
                 refresh_allocation_time=True)
+            self._add_minion_pool_event(
+                ctxt, minion_pool.id, constants.TASK_EVENT_INFO,
+                "The following pre-existing minion machines will be allocated "
+                "to transfer action '%s': %s" % (
+                    action_id, list(existing_machines_to_allocate.keys())))
 
             # add any new machine entries to the DB:
             for new_machine in machine_db_entries_to_add:
@@ -650,6 +659,11 @@ class MinionManagerServerEndpoint(object):
                     new_machine_id, minion_pool.id, action_id)
                 db_api.add_minion_machine(ctxt, new_machine)
                 new_machine_db_entries_added.append(new_machine.id)
+            self._add_minion_pool_event(
+                ctxt, minion_pool.id, constants.TASK_EVENT_INFO,
+                "The following new minion machines will be created for use"
+                " in transfer action '%s': %s" % (
+                    action_id, [m.id for m in machine_db_entries_to_add]))
         except Exception as ex:
             LOG.warn(
                 "Exception occurred while adding new minion machine entries to"
@@ -960,17 +974,17 @@ class MinionManagerServerEndpoint(object):
 
         pool_machine_mappings = {}
         for machine in machines:
-            if machine.status not in targeted_statuses:
+            if machine.allocation_status not in targeted_statuses:
                 LOG.debug(
                     "Skipping deletion of machine '%s' from pool '%s' as "
                     "its status (%s) is not one of the targeted statuses (%s)",
-                    machine.id, machine.pool_id, machine.status,
+                    machine.id, machine.pool_id, machine.allocation_status,
                     targeted_statuses)
                 continue
             if machine.pool_id in exclude_pools:
                 LOG.debug(
                     "Skipping deletion of machine '%s' (status '%s') from "
-                    "whitelisted pool '%s'", machine.id, machine.status,
+                    "whitelisted pool '%s'", machine.id, machine.allocation_status,
                     machine.pool_id)
                 continue
 
@@ -985,7 +999,7 @@ class MinionManagerServerEndpoint(object):
                 for machine in machines:
                     LOG.debug(
                         "Deleting machine with ID '%s' (pool '%s', status '%s') "
-                        "from the DB.", machine.id, pool_id, machine.status)
+                        "from the DB.", machine.id, pool_id, machine.allocation_status)
                     db_api.delete_minion_machine(ctxt, machine.id)
 
     def deallocate_minion_machine(self, ctxt, minion_machine_id):
@@ -1002,22 +1016,22 @@ class MinionManagerServerEndpoint(object):
         machine_allocated_status = constants.MINION_MACHINE_STATUS_IN_USE
         with minion_manager_utils.get_minion_pool_lock(
                 minion_machine.pool_id, external=True):
-            if minion_machine.status != machine_allocated_status or (
+            if minion_machine.allocation_status != machine_allocated_status or (
                     not minion_machine.allocated_action):
                 LOG.warn(
                     "Minion machine '%s' was either in an improper status (%s)"
                     ", or did not have an associated action ('%s') for "
                     "deallocation request. Marking as available anyway.",
-                    minion_machine.id, minion_machine.status,
+                    minion_machine.id, minion_machine.allocation_status,
                     minion_machine.allocated_action)
             LOG.debug(
                 "Attempting to deallocate all minion pool machine '%s' "
                 "(currently allocated to action '%s' with status '%s')",
                 minion_machine.id, minion_machine.allocated_action,
-                minion_machine.status)
+                minion_machine.allocation_status)
             db_api.update_minion_machine(
                 ctxt, minion_machine.id, {
-                    "status": constants.MINION_MACHINE_STATUS_AVAILABLE,
+                    "allocation_status": constants.MINION_MACHINE_STATUS_AVAILABLE,
                     "allocated_action": None})
             LOG.debug(
                 "Successfully deallocated minion machine with '%s'.",
@@ -1052,13 +1066,13 @@ class MinionManagerServerEndpoint(object):
                 # are added to the DB without their point of deployment being
                 # reached for them to ever get out of 'UNINITIALIZED' status:
                 for machine in pool_machines:
-                    if machine.status == (
+                    if machine.allocation_status == (
                             constants.MINION_MACHINE_STATUS_UNINITIALIZED):
                         LOG.warn(
                             "Found minion machine '%s' in pool '%s' which "
                             "is in '%s' status. Removing from the DB "
                             "entirely." % (
-                                machine.id, pool_id, machine.status))
+                                machine.id, pool_id, machine.allocation_status))
                         db_api.delete_minion_machine(
                             ctxt, machine.id)
                         LOG.info(
@@ -1069,7 +1083,7 @@ class MinionManagerServerEndpoint(object):
                         "Going to mark minion machine '%s' (current status "
                         "'%s') of pool '%s' as available following machine "
                         "deallocation request for action '%s'.",
-                        machine.id, machine.status, pool_id, action_id)
+                        machine.id, machine.allocation_status, pool_id, action_id)
                     machine_ids_to_deallocate.append(machine.id)
 
                 LOG.info(
@@ -1086,35 +1100,70 @@ class MinionManagerServerEndpoint(object):
             "with action with base_id '%s'.", action_id)
 
     def _get_healtchcheck_flow_for_minion_machine(
-            self, minion_pool, minion_machine_id, allocate_to_action=None,
+            self, minion_pool, minion_machine, allocate_to_action=None,
             machine_status_on_success=constants.MINION_MACHINE_STATUS_AVAILABLE,
-            inject_for_tasks=None):
+            power_on_machine=True, inject_for_tasks=None):
         """ Returns a taskflow graph flow with a healtcheck task
         and redeployment subflow on error. """
         # define healthcheck subflow for each machine:
         machine_healthcheck_subflow = graph_flow.Flow(
             minion_manager_tasks.MINION_POOL_HEALTHCHECK_MACHINE_SUBFLOW_NAME_FORMAT % (
-                minion_pool.id, minion_machine_id))
+                minion_pool.id, minion_machine.id))
 
         # add healtcheck task to healthcheck subflow:
         machine_healthcheck_task = (
             minion_manager_tasks.HealthcheckMinionMachineTask(
-                minion_pool.id, minion_machine_id, minion_pool.platform,
+                minion_pool.id, minion_machine.id, minion_pool.platform,
                 machine_status_on_success=machine_status_on_success,
-                fail_on_error=False, inject=inject_for_tasks))
+                inject=inject_for_tasks,
+                # we prevent a raise here as the healthcheck subflow
+                # will take care of redeploying the instance later:
+                fail_on_error=False))
         machine_healthcheck_subflow.add(machine_healthcheck_task)
 
+        # optionally add minion machine power on task:
+        if power_on_machine:
+            if minion_machine.power_status == (
+                    constants.MINION_MACHINE_POWER_STATUS_POWERED_OFF):
+                power_on_task = minion_manager_tasks.PowerOnMinionMachineTask(
+                    minion_pool.id, minion_machine.id, minion_pool.platform,
+                    inject=inject_for_tasks,
+                    # we prevent a raise here as the healthcheck subflow
+                    # will take care of redeploying the instance later:
+                    fail_on_error=False)
+                machine_healthcheck_subflow.add(
+                    power_on_task,
+                    # NOTE: this is required to not have taskflow attempt
+                    # (and fail) to automatically link the above Healthcheck
+                    # task to the power on task based on inputs/outputs alone:
+                    resolve_existing=False)
+                machine_healthcheck_subflow.link(
+                    power_on_task, machine_healthcheck_task,
+                    # NOTE: taskflow gets confused when a task in the graph
+                    # flow is linked to two others with no decider for each
+                    # so we have to add a dummy decider which will always
+                    # greenlight the rest of the execution here:
+                    decider=taskflow_utils.DummyDecider(allow=True),
+                    decider_depth=taskflow_deciders.Depth.FLOW)
+            else:
+                LOG.debug(
+                    "Minion Machine with ID '%s' of pool '%s' is in power "
+                    "state '%s' during healtchcheck subflow definition. "
+                    "Not adding any power on task for it.",
+                    minion_machine.id, minion_machine.pool_id,
+                    minion_machine.power_status)
+
         # define reallocation subflow:
         machine_reallocation_subflow = linear_flow.Flow(
             minion_manager_tasks.MINION_POOL_REALLOCATE_MACHINE_SUBFLOW_NAME_FORMAT % (
-                minion_pool.id, minion_machine_id))
+                minion_pool.id, minion_machine.id))
         machine_reallocation_subflow.add(
             minion_manager_tasks.DeallocateMinionMachineTask(
-                minion_pool.id, minion_machine_id, minion_pool.platform,
+                minion_pool.id, minion_machine.id, minion_pool.platform,
                 inject=inject_for_tasks))
         machine_reallocation_subflow.add(
             minion_manager_tasks.AllocateMinionMachineTask(
-                minion_pool.id, minion_machine_id, minion_pool.platform,
+                minion_pool.id, minion_machine.id, minion_pool.platform,
                 allocate_to_action=allocate_to_action,
                 inject=inject_for_tasks))
         machine_healthcheck_subflow.add(
@@ -1130,7 +1179,7 @@ class MinionManagerServerEndpoint(object):
             # NOTE: this is required to prevent any parent flows from skipping:
             decider_depth=taskflow_deciders.Depth.FLOW,
             decider=minion_manager_tasks.MinionMachineHealtchcheckDecider(
-                minion_pool.id, minion_machine_id,
+                minion_pool.id, minion_machine.id,
                 on_successful_healthcheck=False))
 
         return machine_healthcheck_subflow
@@ -1145,11 +1194,13 @@ class MinionManagerServerEndpoint(object):
 
         # determine how many machines could be feasibily downscaled:
         machine_statuses = {
-            machine.id: machine.status
+            machine.id: machine.allocation_status
             for machine in minion_pool.minion_machines}
         ignorable_machine_statuses = [
             constants.MINION_MACHINE_STATUS_DEALLOCATING,
+            constants.MINION_MACHINE_STATUS_POWERING_OFF,
             constants.MINION_MACHINE_STATUS_ERROR,
+            constants.MINION_MACHINE_STATUS_POWER_ERROR,
             constants.MINION_MACHINE_STATUS_ERROR_DEPLOYING]
         max_minions_to_deallocate = (
             len([
@@ -1158,8 +1209,9 @@ class MinionManagerServerEndpoint(object):
                     minion_pool.minimum_minions))
         LOG.debug(
             "Determined minion pool '%s' machine deallocation number to be %d "
-            "based on machines stauses: %s",
-            minion_pool.id, max_minions_to_deallocate, machine_statuses)
+            "(pool minimum is '%d') based on current machines stauses: %s",
+            minion_pool.id, max_minions_to_deallocate,
+            minion_pool.minimum_minions, machine_statuses)
 
         # define refresh flow and process all relevant machines:
         pool_refresh_flow = unordered_flow.Flow(
@@ -1176,11 +1228,14 @@ class MinionManagerServerEndpoint(object):
             # point may be back online. Event if it isn't, the
             # sublow redeploy it after the healthcheck fails:
             constants.MINION_MACHINE_STATUS_ERROR,
+            constants.MINION_MACHINE_STATUS_POWER_ERROR,
             constants.MINION_MACHINE_STATUS_ERROR_DEPLOYING]
 
         for machine in minion_pool.minion_machines:
-            if machine.status not in healthcheckable_machine_statuses:
-                skipped_machines[machine.id] = machine.status
+            if machine.allocation_status not in (
+                    healthcheckable_machine_statuses):
+                skipped_machines[machine.id] = (
+                    machine.allocation_status, machine.power_status)
                 continue
 
             minion_expired = True
@@ -1192,39 +1247,80 @@ class MinionManagerServerEndpoint(object):
 
             # deallocate the machine if it is expired:
             if max_minions_to_deallocate > 0 and minion_expired:
-                pool_refresh_flow.add(
-                    minion_manager_tasks.DeallocateMinionMachineTask(
-                        minion_pool.id, machine.id, minion_pool.platform))
+                if minion_pool.minion_retention_strategy == (
+                        constants.MINION_POOL_MACHINE_RETENTION_STRATEGY_POWEROFF):
+                    if machine.power_status == (
+                            constants.MINION_MACHINE_POWER_STATUS_POWERED_OFF):
+                        LOG.debug(
+                            "Skipping powering off minion machine '%s' of pool"
+                            " '%s' as it is already in powered off state.",
+                            machine.id, minion_pool.id)
+                        # NOTE: we count this machine out of the downscaling:
+                        max_minions_to_deallocate = max_minions_to_deallocate - 1
+                        continue
+                    pool_refresh_flow.add(
+                        minion_manager_tasks.PowerOffMinionMachineTask(
+                            minion_pool.id, machine.id, minion_pool.platform,
+                            fail_on_error=False,
+                            status_once_powered_off=(
+                                constants.MINION_MACHINE_STATUS_AVAILABLE)))
+                elif minion_pool.minion_retention_strategy == (
+                        constants.MINION_POOL_MACHINE_RETENTION_STRATEGY_DELETE):
+                    pool_refresh_flow.add(
+                        minion_manager_tasks.DeallocateMinionMachineTask(
+                                minion_pool.id, machine.id,
+                                minion_pool.platform))
+                else:
+                    raise exception.InvalidMinionPoolState(
+                        "Unknown minion pool retention strategy '%s' for pool "
+                        "'%s'" % (
+                            minion_pool.minion_retention_strategy,
+                            minion_pool.id))
                 max_minions_to_deallocate = max_minions_to_deallocate - 1
                 machines_to_deallocate.append(machine.id)
-            # else, perform a healthcheck on the machine:
-            else:
+            # else, perform a healthcheck on the machine if it is powered on:
+            elif machine.power_status == (
+                    constants.MINION_MACHINE_POWER_STATUS_POWERED_ON):
                 pool_refresh_flow.add(
                     self._get_healtchcheck_flow_for_minion_machine(
-                        minion_pool, machine.id, allocate_to_action=None,
+                        minion_pool, machine, allocate_to_action=None,
                         machine_status_on_success=(
                             constants.MINION_MACHINE_STATUS_AVAILABLE)))
                 machines_to_healthcheck.append(machine.id)
-
+            else:
+                skipped_machines[machine.id] = (
+                    machine.allocation_status, machine.power_status)
 
         # update DB entried for all machines and emit relevant events:
         if skipped_machines:
+            base_msg =  (
+                "The following minion machines were skipped during the "
+                "refreshing of the minion pool as they were in other "
+                "statuses than the serviceable ones:")
+            LOG.debug(
+                "[Pool '%s'] %s: %s", minion_pool.id, base_msg,
+                skipped_machines)
             self._add_minion_pool_event(
                 ctxt, minion_pool.id, constants.TASK_EVENT_INFO,
-                "The following minion machines were skipped during the "
-                "refreshing of ther minion pool as they were in other "
-                "statuses than the serviceable ones: %s" % skipped_machines)
+                base_msg % list(skipped_machines.keys()))
 
         if machines_to_deallocate:
+            deallocation_action = "deallocated"
+            status_for_deallocated_machines = (
+                constants.MINION_MACHINE_STATUS_DEALLOCATING)
+            if minion_pool.minion_retention_strategy == (
+                    constants.MINION_POOL_MACHINE_RETENTION_STRATEGY_POWEROFF):
+                deallocation_action = "powered off"
+                status_for_deallocated_machines = (
+                    constants.MINION_MACHINE_STATUS_POWERING_OFF)
             self._add_minion_pool_event(
                 ctxt, minion_pool.id, constants.TASK_EVENT_INFO,
-                "The following minion machines will be deallocated as part "
+                "The following minion machines will be %s as part "
                 "of the refreshing of the minion pool: %s" % (
-                    machines_to_deallocate))
+                    deallocation_action, machines_to_deallocate))
             for machine in machines_to_deallocate:
-                db_api.set_minion_machine_status(
-                    ctxt, machine,
-                    constants.MINION_MACHINE_STATUS_DEALLOCATING)
+                db_api.set_minion_machine_allocation_status(
+                    ctxt, machine, status_for_deallocated_machines)
         else:
             self._add_minion_pool_event(
                 ctxt, minion_pool.id, constants.TASK_EVENT_INFO,
@@ -1237,7 +1333,7 @@ class MinionManagerServerEndpoint(object):
                 "of the refreshing of the minion pool: %s" % (
                     machines_to_healthcheck))
             for machine in machines_to_healthcheck:
-                db_api.set_minion_machine_status(
+                db_api.set_minion_machine_allocation_status(
                     ctxt, machine,
                     constants.MINION_MACHINE_STATUS_HEALTHCHECKING)
         else:
@@ -1266,12 +1362,12 @@ class MinionManagerServerEndpoint(object):
                     minion_pool_id, current_status,
                     acceptable_allocation_statuses))
 
-        healthcheck_flow = self._get_minion_pool_refresh_flow(
+        refresh_flow = self._get_minion_pool_refresh_flow(
             ctxt, minion_pool, requery=False)
-        if not healthcheck_flow:
+        if not refresh_flow:
             msg = (
-                "There are no minion machine healthchecks to be performed at "
-                "this time.")
+                "There are no minion machine refresh operations to be performed "
+                "at this time")
             db_api.add_minion_pool_event(
                 ctxt, minion_pool.id, constants.TASK_EVENT_INFO, msg)
             return self._get_minion_pool(ctxt, minion_pool.id)
@@ -1279,7 +1375,7 @@ class MinionManagerServerEndpoint(object):
         initial_store = self._get_pool_initial_taskflow_store_base(
             ctxt, minion_pool, endpoint_dict)
         self._taskflow_runner.run_flow_in_background(
-            healthcheck_flow, store=initial_store)
+            refresh_flow, store=initial_store)
         self._add_minion_pool_event(
             ctxt, minion_pool.id, constants.TASK_EVENT_INFO,
             "Begun minion pool refreshing process")
@@ -1451,16 +1547,17 @@ class MinionManagerServerEndpoint(object):
         unused_machine_states = [
             constants.MINION_MACHINE_STATUS_AVAILABLE,
             constants.MINION_MACHINE_STATUS_ERROR_DEPLOYING,
+            constants.MINION_MACHINE_STATUS_POWER_ERROR,
             constants.MINION_MACHINE_STATUS_ERROR]
         used_machines = {
             mch for mch in minion_pool.minion_machines
-            if mch.status not in unused_machine_states}
+            if mch.allocation_status not in unused_machine_states}
         if used_machines and raise_if_in_use:
             raise exception.InvalidMinionPoolState(
                 "Minion pool '%s' has one or more machines which are in an"
                 " active state: %s" % (
                     minion_pool.id, {
-                        mch.id: mch.status for mch in used_machines}))
+                        mch.id: mch.allocation_status for mch in used_machines}))
         return used_machines
 
     @minion_manager_utils.minion_pool_synchronized_op
@@ -1511,7 +1608,8 @@ class MinionManagerServerEndpoint(object):
 
         return self._get_minion_pool(ctxt, minion_pool.id)
 
-    def _get_minion_pool_deallocation_flow(self, minion_pool):
+    def _get_minion_pool_deallocation_flow(
+            self, minion_pool, raise_on_error=True):
         """ Returns a taskflow.Flow object pertaining to all the tasks
         required for deallocating a minion pool (machines and shared resources)
         """
@@ -1527,7 +1625,8 @@ class MinionManagerServerEndpoint(object):
         for machine in minion_pool.minion_machines:
             machines_flow.add(
                 minion_manager_tasks.DeallocateMinionMachineTask(
-                    minion_pool.id, machine.id, minion_pool.platform))
+                    minion_pool.id, machine.id, minion_pool.platform,
+                    raise_on_cleanup_failure=raise_on_error))
         # NOTE: bool(flow) == False if the flow has no child flows/tasks:
         if machines_flow:
             # tansition pool to DEALLOCATING_MACHINES:
@@ -1599,7 +1698,7 @@ class MinionManagerServerEndpoint(object):
             ctxt, minion_pool.endpoint_id)
 
         deallocation_flow = self._get_minion_pool_deallocation_flow(
-            minion_pool)
+            minion_pool, raise_on_error=not force)
         initial_store = self._get_pool_deallocation_initial_store(
             ctxt, minion_pool, endpoint_dict)
 

+ 271 - 34
coriolis/minion_manager/rpc/tasks.py

@@ -64,6 +64,10 @@ MINION_POOL_REPORT_MIGRATION_ALLOCATION_FAILURE_TASK_NAME_FORMAT = (
     "migration-%s-minion-allocation-failure")
 MINION_POOL_REPORT_REPLICA_ALLOCATION_FAILURE_TASK_NAME_FORMAT = (
     "replica-%s-minion-allocation-failure")
+MINION_POOL_POWER_ON_MACHINE_TASK_NAME_FORMAT = (
+    "pool-%s-machine-%s-power-on")
+MINION_POOL_POWER_OFF_MACHINE_TASK_NAME_FORMAT = (
+    "pool-%s-machine-%s-power-off")
 
 
 class MinionManagerTaskEventMixin(object):
@@ -109,13 +113,19 @@ class MinionManagerTaskEventMixin(object):
             db_api.update_minion_machine(
                 ctxt, minion_machine_id, updated_values)
 
-    def _set_minion_machine_status(
+    def _set_minion_machine_allocation_status(
             self, ctxt, minion_pool_id, minion_machine_id, new_status):
         with minion_manager_utils.get_minion_pool_lock(
                 minion_pool_id, external=True):
-            db_api.set_minion_machine_status(
+            db_api.set_minion_machine_allocation_status(
                 ctxt, minion_machine_id, new_status)
 
+    def _set_minion_machine_power_status(
+            self, ctxt, minion_pool_id, minion_machine_id, new_status):
+        self._update_minion_machine(
+            ctxt, minion_pool_id, minion_machine_id,
+            {"power_status": new_status})
+
 
 class _BaseReportMinionAllocationFailureForActionTask(
         coriolis_taskflow_base.BaseCoriolisTaskflowTask,
@@ -228,23 +238,38 @@ class _BaseConfirmMinionAllocationForActionTask(
 
         def _check_minion_properties(
                 minion_machine, instance, minion_purpose="unknown"):
-            if minion_machine.status != (
+            if minion_machine.allocation_status != (
                     constants.MINION_MACHINE_STATUS_IN_USE):
                 raise exception.InvalidMinionMachineState(
-                    "Minion machine with ID '%s' is in '%s' status instead "
-                    "of the expected '%s' for it to be used as a '%s' "
-                    "minion for instance '%s' of transfer action '%s'." % (
-                        minion_machine.id, minion_machine.status,
+                    "Minion machine with ID '%s' of pool '%s' is in '%s' "
+                    "status instead of the expected '%s' for it to be used "
+                    "as a '%s' minion for instance '%s' of transfer "
+                    "action '%s'." % (
+                        minion_machine.id, minion_machine.pool_id,
+                        minion_machine.allocation_status,
                         constants.MINION_MACHINE_STATUS_IN_USE,
                         minion_purpose, instance, self._action_id))
 
             if minion_machine.allocated_action != self._action_id:
                 raise exception.InvalidMinionMachineState(
-                    "Minion machine with ID '%s' appears to be allocated to "
-                    "action with ID '%s' instead of the expected '%s' for it "
-                    "to be used as a '%s' minion for instance '%s'." % (
-                        minion_machine.id, minion_machine.allocated_action,
-                        self._action_id, minion_purpose, instance))
+                    "Minion machine with ID '%s' of pool '%s' appears to be "
+                    "allocated to action with ID '%s' instead of the expected"
+                    " '%s' for it to be used as a '%s' minion for instance '%s'." % (
+                        minion_machine.id, minion_machine.pool_id,
+                        minion_machine.allocated_action, self._action_id,
+                        minion_purpose, instance))
+
+            if minion_machine.power_status != (
+                    constants.MINION_MACHINE_POWER_STATUS_POWERED_ON):
+                raise exception.InvalidMinionMachineState(
+                    "Minion machine with ID '%s' of pool '%s' is in '%s' "
+                    "power status instead of the expected '%s' for it to be "
+                    "used as a '%s' minion for instance '%s' of transfer "
+                    "action '%s'." % (
+                        minion_machine.id, minion_machine.pool_id,
+                        minion_machine.power_status,
+                        constants.MINION_MACHINE_POWER_STATUS_POWERED_ON,
+                        minion_purpose, instance, self._action_id))
 
             # TODO(aznashwan): add extra checks for conn info schemas here?
             required_props = {
@@ -676,7 +701,8 @@ class AllocateMinionMachineTask(BaseMinionManangerTask):
         self._raise_on_cleanup_failure = raise_on_cleanup_failure
         super(AllocateMinionMachineTask, self).__init__(
             minion_pool_id, minion_machine_id, resource_deployment_task_type,
-            cleanup_task_runner_type=resource_cleanup_task_type, **kwargs)
+            cleanup_task_runner_type=resource_cleanup_task_type,
+            raise_on_cleanup_failure=raise_on_cleanup_failure, **kwargs)
 
     def _get_task_name(self, minion_pool_id, minion_machine_id):
         return MINION_POOL_ALLOCATE_MACHINE_TASK_NAME_FORMAT % (
@@ -686,13 +712,13 @@ class AllocateMinionMachineTask(BaseMinionManangerTask):
         minion_machine = self._get_minion_machine(
             context, self._minion_machine_id, raise_if_not_found=False)
         if minion_machine:
-            if minion_machine.status != (
+            if minion_machine.allocation_status != (
                     constants.MINION_MACHINE_STATUS_UNINITIALIZED):
                 raise exception.InvalidMinionMachineState(
                     "Minion machine entry with ID '%s' already exists within "
                     "the DB and it is in '%s' status instead of the expected "
                     "'%s' status. Existing machine's properties are: %s" % (
-                        self._minion_machine_id, minion_machine.status,
+                        self._minion_machine_id, minion_machine.allocation_status,
                         constants.MINION_MACHINE_STATUS_UNINITIALIZED,
                         minion_machine.to_dict()))
             if minion_machine.pool_id != self._minion_pool_id:
@@ -717,15 +743,17 @@ class AllocateMinionMachineTask(BaseMinionManangerTask):
                 "[Task '%s'] Found existing entry in DB for minion machine "
                 "'%s'. Reusing that for deployment task.",
                 self._task_name, self._minion_machine_id)
-            self._set_minion_machine_status(
+            self._set_minion_machine_allocation_status(
                 context, self._minion_pool_id, self._minion_machine_id,
                 constants.MINION_MACHINE_STATUS_ALLOCATING)
         else:
             minion_machine = models.MinionMachine()
             minion_machine.id = self._minion_machine_id
             minion_machine.pool_id = self._minion_pool_id
-            minion_machine.status = (
+            minion_machine.allocation_status = (
                 constants.MINION_MACHINE_STATUS_ALLOCATING)
+            minion_machine.power_status = (
+                constants.MINION_MACHINE_POWER_STATUS_UNINITIALIZED)
             log_msg = (
                 "[Task '%s'] Adding new minion machine with ID '%s' "
                 "to the DB" % (self._task_name, self._minion_machine_id))
@@ -756,7 +784,7 @@ class AllocateMinionMachineTask(BaseMinionManangerTask):
             res = super(AllocateMinionMachineTask, self).execute(
                 context, origin, destination, execution_info)
         except:
-            self._set_minion_machine_status(
+            self._set_minion_machine_allocation_status(
                 context, self._minion_pool_id, self._minion_machine_id,
                 constants.MINION_MACHINE_STATUS_ERROR_DEPLOYING)
             raise
@@ -768,14 +796,15 @@ class AllocateMinionMachineTask(BaseMinionManangerTask):
 
         updated_values = {
             "last_used_at": timeutils.utcnow(),
-            "status": constants.MINION_MACHINE_STATUS_AVAILABLE,
+            "allocation_status": constants.MINION_MACHINE_STATUS_AVAILABLE,
+            "power_status": constants.MINION_MACHINE_POWER_STATUS_POWERED_ON,
             "connection_info": res['minion_connection_info'],
             "provider_properties": res['minion_provider_properties'],
             "backup_writer_connection_info": res[
                 "minion_backup_writer_connection_info"]}
         if self._allocate_to_action:
             updated_values["allocated_action"] = self._allocate_to_action
-            updated_values["status"] = (
+            updated_values["allocation_status"] = (
                 constants.MINION_MACHINE_STATUS_IN_USE)
         self._update_minion_machine(
             context, self._minion_pool_id, self._minion_machine_id,
@@ -870,7 +899,7 @@ class AllocateMinionMachineTask(BaseMinionManangerTask):
 
         if not minion_provider_properties:
             LOG.debug(
-                "[Task '%s'] Reversion for Minion Machine '%s' (pool '%s')"
+                "[Task '%s'] Reversion for Minion Machine '%s' (pool '%s') "
                 "found no 'minion_provider_properties'. Presuming the machine "
                 "never got created on the cloud and skiping any deletion task",
                 self._task_name, self._minion_machine_id,
@@ -903,15 +932,16 @@ class DeallocateMinionMachineTask(BaseMinionManangerTask):
 
     def __init__(
             self, minion_pool_id, minion_machine_id, minion_pool_type,
-            **kwargs):
+            raise_on_cleanup_failure=True, **kwargs):
         resource_deletion_task_type = (
             constants.TASK_TYPE_DELETE_SOURCE_MINION_MACHINE)
+        self._raise_on_cleanup_failure = raise_on_cleanup_failure
         if minion_pool_type != constants.PROVIDER_PLATFORM_SOURCE:
             resource_deletion_task_type = (
                 constants.TASK_TYPE_DELETE_DESTINATION_MINION_MACHINE)
         super(DeallocateMinionMachineTask, self).__init__(
             minion_pool_id, minion_machine_id, resource_deletion_task_type,
-            **kwargs)
+            raise_on_cleanup_failure=raise_on_cleanup_failure, **kwargs)
 
     def _get_task_name(self, minion_pool_id, minion_machine_id):
         return MINION_POOL_DEALLOCATE_MACHINE_TASK_NAME_FORMAT % (
@@ -932,13 +962,29 @@ class DeallocateMinionMachineTask(BaseMinionManangerTask):
                 self._minion_machine_id))
 
         if machine.provider_properties:
-            self._set_minion_machine_status(
+            self._set_minion_machine_allocation_status(
                 context, self._minion_pool_id, self._minion_machine_id,
                 constants.MINION_MACHINE_STATUS_DEALLOCATING)
             execution_info = {
                 "minion_provider_properties": machine.provider_properties}
-            _ = super(DeallocateMinionMachineTask, self).execute(
-                context, origin, destination, execution_info)
+            try:
+                _ = super(DeallocateMinionMachineTask, self).execute(
+                    context, origin, destination, execution_info)
+            except Exception as ex:
+                base_msg = (
+                    "Exception occured while deallocating minion machine '%s' "
+                    "There might be leftover instance resources requiring "
+                    "manual cleanup" % self._minion_machine_id)
+                LOG.warn(
+                    "[Task '%s'] %s. Error was: %s",
+                    self._task_name, base_msg, utils.get_exception_details())
+                event_level = constants.TASK_EVENT_INFO
+                if self._raise_on_cleanup_failure:
+                    event_level = constants.TASK_EVENT_ERROR
+                self._add_minion_pool_event(
+                    context, base_msg, level=event_level)
+                if self._raise_on_cleanup_failure:
+                    raise
         else:
             self._add_minion_pool_event(
                 context,
@@ -986,15 +1032,43 @@ class HealthcheckMinionMachineTask(BaseMinionManangerTask):
             "healthy": True,
             "error": None}
 
-        machine = self._get_minion_machine(context, self._minion_machine_id)
+        machine = self._get_minion_machine(
+            context, self._minion_machine_id, raise_if_not_found=False)
         if not machine:
             LOG.info(
                 "[Task '%s'] Could not find machine with ID '%s' in the DB. "
                 "Presuming it was already deleted so healthcheck failed.",
                 self._task_name, self._minion_machine_id)
-            return {
-                "healthy": False,
-                "error": "Machine not found."}
+            base_msg = (
+                "Could not find minion machine DB entry with ID '%s' for "
+                "healtcheck." % self._minion_machine_id)
+            self._add_minion_pool_event(
+                context,
+                "%s Reporting healthcheck as failed" % base_msg,
+                level=constants.TASK_EVENT_WARNING)
+
+            if self._fail_on_error:
+                raise exception.InvalidMinionMachineState(base_msg)
+            return {"healthy": False, "error": base_msg}
+
+        machine_error_statuses = [
+            constants.MINION_MACHINE_STATUS_ERROR,
+            constants.MINION_MACHINE_STATUS_POWER_ERROR,
+            constants.MINION_MACHINE_STATUS_ERROR_DEPLOYING]
+        if machine.allocation_status in machine_error_statuses:
+            base_msg = (
+                "Minion Machine with ID '%s' is marked as '%s' in the DB." % (
+                    self._minion_machine_id, machine.allocation_status))
+            LOG.debug(
+                "[Task '%s'] %s" % (self._task_name, base_msg))
+            self._add_minion_pool_event(
+                context,
+                "%s Reporting healthcheck as failed" % base_msg,
+                level=constants.TASK_EVENT_WARNING)
+
+            if self._fail_on_error:
+                raise exception.InvalidMinionMachineState(base_msg)
+            return {"healthy": False, "error": base_msg}
 
         self._add_minion_pool_event(
             context,
@@ -1011,7 +1085,7 @@ class HealthcheckMinionMachineTask(BaseMinionManangerTask):
                 context,
                 "Successfully healtchecked minion machine with internal "
                 "pool ID '%s'" % self._minion_machine_id)
-            self._set_minion_machine_status(
+            self._set_minion_machine_allocation_status(
                 context, self._minion_pool_id, self._minion_machine_id,
                 self._machine_status_on_success)
         except Exception as ex:
@@ -1025,7 +1099,7 @@ class HealthcheckMinionMachineTask(BaseMinionManangerTask):
                 "Full trace was:\n%s", self._task_name,
                 self._minion_machine_id, self._minion_pool_id,
                 utils.get_exception_details())
-            self._set_minion_machine_status(
+            self._set_minion_machine_allocation_status(
                 context, self._minion_pool_id, self._minion_machine_id,
                 constants.MINION_MACHINE_STATUS_ERROR)
             if not self._fail_on_error:
@@ -1080,6 +1154,169 @@ class MinionMachineHealtchcheckDecider(object):
         else:
             LOG.debug(
                 "Healtcheck task '%s' denied worker health. Decider "
-                "returning %s", healthcheck_task_name,
-                not self._on_success)
+                "returning %s. Error mesage was: %s",
+                healthcheck_task_name, not self._on_success,
+                healtcheck_result.get('error'))
             return not self._on_success
+
+
+class PowerOnMinionMachineTask(BaseMinionManangerTask):
+
+    def __init__(
+            self, minion_pool_id, minion_machine_id, minion_pool_type,
+            fail_on_error=True, **kwargs):
+        self._fail_on_error = fail_on_error
+        power_on_task_type = (
+            constants.TASK_TYPE_POWER_ON_SOURCE_MINION)
+        if minion_pool_type != constants.PROVIDER_PLATFORM_SOURCE:
+            power_on_task_type = (
+                constants.TASK_TYPE_POWER_ON_DESTINATION_MINION)
+        super(PowerOnMinionMachineTask, self).__init__(
+            minion_pool_id, minion_machine_id, power_on_task_type,
+            **kwargs)
+
+    def _get_task_name(self, minion_pool_id, minion_machine_id):
+        return MINION_POOL_POWER_ON_MACHINE_TASK_NAME_FORMAT % (
+            minion_pool_id, minion_machine_id)
+
+    def execute(self, context, origin, destination, task_info):
+        machine = self._get_minion_machine(
+            context, self._minion_machine_id, raise_if_not_found=True)
+
+        if machine.power_status == constants.MINION_MACHINE_POWER_STATUS_POWERED_ON:
+            LOG.debug(
+                "[Task '%s'] Minion machine with ID '%s' from pool '%s' is "
+                "already marked as powered on. Returning early." % (
+                    self._task_name, self._minion_machine_id,
+                    self._minion_pool_id))
+            return task_info
+
+        if machine.power_status != constants.MINION_MACHINE_POWER_STATUS_POWERED_OFF:
+            raise exception.InvalidMinionMachineState(
+                "Minion machine with ID '%s' from pool '%s' is in '%s' state "
+                "instead of the expected '%s' required for it to be powered "
+                "on." % (
+                    self._minion_machine_id, self._minion_pool_id,
+                    machine.power_status,
+                    constants.MINION_MACHINE_POWER_STATUS_POWERED_OFF))
+
+        execution_info = {
+            "minion_provider_properties": machine.provider_properties}
+        try:
+            self._set_minion_machine_power_status(
+                context, self._minion_pool_id,
+                self._minion_machine_id,
+                constants.MINION_MACHINE_POWER_STATUS_POWERING_ON)
+            _ = super(PowerOnMinionMachineTask, self).execute(
+                context, origin, destination, execution_info)
+            self._set_minion_machine_power_status(
+                context, self._minion_pool_id,
+                self._minion_machine_id,
+                constants.MINION_MACHINE_POWER_STATUS_POWERED_ON)
+            self._add_minion_pool_event(
+                context,
+                "Successfully powered on minion machine with internal pool "
+                "ID '%s'" % self._minion_machine_id)
+        except Exception as ex:
+            base_msg = (
+                "[Task '%s'] Exception occurred while powering on minion "
+                "machine with ID '%s' of pool '%s'." % (
+                    self._task_name, self._minion_machine_id,
+                    self._minion_pool_id))
+            LOG.warn(
+                "%s Error details were: %s" % (
+                    base_msg, utils.get_exception_details()))
+            self._add_minion_pool_event(
+                context,
+                "Exception occurred while powering on minion machine with "
+                "internal pool ID '%s'. The minion machine will be marked "
+                "as ERROR'd and automatically redeployed later" % (
+                    self._minion_machine_id),
+                level=constants.TASK_EVENT_ERROR)
+            self._set_minion_machine_allocation_status(
+                context, self._minion_pool_id, self._minion_machine_id,
+                constants.MINION_MACHINE_STATUS_POWER_ERROR)
+            if self._fail_on_error:
+                raise exception.CoriolisException(base_msg)
+
+        return task_info
+
+
+class PowerOffMinionMachineTask(BaseMinionManangerTask):
+
+    def __init__(
+            self, minion_pool_id, minion_machine_id, minion_pool_type,
+            fail_on_error=True,
+            status_once_powered_off=constants.MINION_MACHINE_STATUS_AVAILABLE,
+            **kwargs):
+        self._fail_on_error = fail_on_error
+        self._status_once_powered_off = status_once_powered_off
+        power_on_task_type = (
+            constants.TASK_TYPE_POWER_OFF_SOURCE_MINION)
+        if minion_pool_type != constants.PROVIDER_PLATFORM_SOURCE:
+            power_on_task_type = (
+                constants.TASK_TYPE_POWER_OFF_DESTINATION_MINION)
+        super(PowerOffMinionMachineTask, self).__init__(
+            minion_pool_id, minion_machine_id, power_on_task_type,
+            **kwargs)
+
+    def _get_task_name(self, minion_pool_id, minion_machine_id):
+        return MINION_POOL_POWER_OFF_MACHINE_TASK_NAME_FORMAT % (
+            minion_pool_id, minion_machine_id)
+
+    def execute(self, context, origin, destination, task_info):
+        machine = self._get_minion_machine(
+            context, self._minion_machine_id, raise_if_not_found=True)
+
+        if machine.power_status == (
+                constants.MINION_MACHINE_POWER_STATUS_POWERED_OFF):
+            LOG.debug(
+                "[Task '%s'] Minion machine with ID '%s' from pool '%s' is "
+                "already marked as powered off. Returning early." % (
+                    self._task_name, self._minion_machine_id,
+                    self._minion_pool_id))
+            return task_info
+
+        execution_info = {
+            "minion_provider_properties": machine.provider_properties}
+        try:
+            self._set_minion_machine_power_status(
+                context, self._minion_pool_id,
+                self._minion_machine_id,
+                constants.MINION_MACHINE_POWER_STATUS_POWERING_OFF)
+            _ = super(PowerOffMinionMachineTask, self).execute(
+                context, origin, destination, execution_info)
+            self._set_minion_machine_power_status(
+                context, self._minion_pool_id,
+                self._minion_machine_id,
+                constants.MINION_MACHINE_POWER_STATUS_POWERED_OFF)
+            self._set_minion_machine_allocation_status(
+                context, self._minion_pool_id, self._minion_machine_id,
+                self._status_once_powered_off)
+            self._add_minion_pool_event(
+                context,
+                "Successfully powered off minion machine with internal pool "
+                "ID '%s'" % self._minion_machine_id)
+        except Exception as ex:
+            base_msg = (
+                "[Task '%s'] Exception occurred while powering off minion "
+                "machine with ID '%s' of pool '%s'." % (
+                    self._task_name, self._minion_machine_id,
+                    self._minion_pool_id))
+            self._add_minion_pool_event(
+                context,
+                "Exception occurred while powering off minion machine with "
+                "internal pool ID '%s'. The minion machine will be marked "
+                "as ERROR'd and automatically redeployed later." % (
+                    self._minion_machine_id),
+                level=constants.TASK_EVENT_ERROR)
+            self._set_minion_machine_allocation_status(
+                context, self._minion_pool_id, self._minion_machine_id,
+                constants.MINION_MACHINE_STATUS_POWER_ERROR)
+            LOG.warn(
+                "%s Error details were: %s" % (
+                    base_msg, utils.get_exception_details()))
+            if self._fail_on_error:
+                raise exception.CoriolisException(base_msg)
+
+        return task_info

+ 9 - 8
coriolis/taskflow/base.py

@@ -187,9 +187,9 @@ class BaseRunWorkerTask(BaseCoriolisTaskflowTask):
             ctxt, task_info, origin, destination, retry_count=retry_count,
             retry_period=retry_period, random_choice=True)
         LOG.debug(
-            "Was offered the following worker service for executing TaskFlow "
-            "task '%s' (taskflow ID %s): %s",
-            self._task_name, task_id, worker_service['id'])
+            "[Task '%s'] Was offered the following worker service for executing "
+            "Taskflow worker task '%s': %s",
+                self._task_name, task_id, worker_service['id'])
 
         return rpc_worker_client.WorkerClient.from_service_definition(
             worker_service, timeout=rpc_timeout)
@@ -201,15 +201,16 @@ class BaseRunWorkerTask(BaseCoriolisTaskflowTask):
 
         try:
             LOG.debug(
-                "Starting to run task '%s' (type '%s') on worker service." % (
-                    self._task_name, task_type))
+                "[Task '%s'] Starting to run task '%s' (type '%s') "
+                "on worker service." % (
+                    self._task_id, self._task_name, task_type))
             res = worker_rpc.run_task(
                 ctxt, None, self._task_id, task_type, origin, destination,
                 self._task_instance, task_info)
             LOG.debug(
-                "Taskflow task '%s' (type %s) has successfully run and returned "
-                "the following info: %s" % (
-                    task_id, task_type, res))
+                "[Task '%s'] Taskflow worker task '%s' (type %s) has "
+                "successfully run and returned the following info: %s" % (
+                    self._task_name, task_id, task_type, res))
             return res
         except Exception as ex:
             LOG.debug(

+ 20 - 0
coriolis/taskflow/utils.py

@@ -0,0 +1,20 @@
+# Copyright 2020 Cloudbase Solutions Srl
+# All Rights Reserved.
+
+from oslo_log import log as logging
+
+
+LOG = logging.getLogger(__name__)
+
+
+class DummyDecider(object):
+    """ A callable to decide execution in a pre-defined manner. """
+
+    def __init__(self, allow=True):
+        self._allow = allow
+
+    def __call__(self, history):
+        LOG.debug(
+            "Dummy decider returning '%s'. Provided task history was: %s",
+            self._allow, history)
+        return self._allow

+ 9 - 1
coriolis/tasks/factory.py

@@ -132,7 +132,15 @@ _TASKS_MAP = {
     constants.TASK_TYPE_HEALTHCHECK_SOURCE_MINION:
         minion_pool_tasks.HealthcheckSourceMinionMachineTask,
     constants.TASK_TYPE_HEALTHCHECK_DESTINATION_MINION:
-        minion_pool_tasks.HealthcheckDestinationMinionTask
+        minion_pool_tasks.HealthcheckDestinationMinionTask,
+    constants.TASK_TYPE_POWER_ON_SOURCE_MINION:
+        minion_pool_tasks.PowerOnSourceMinionTask,
+    constants.TASK_TYPE_POWER_OFF_SOURCE_MINION:
+        minion_pool_tasks.PowerOffSourceMinionTask,
+    constants.TASK_TYPE_POWER_ON_DESTINATION_MINION:
+        minion_pool_tasks.PowerOnDestinationMinionTask,
+    constants.TASK_TYPE_POWER_OFF_DESTINATION_MINION:
+        minion_pool_tasks.PowerOffDestinationMinionTask
 }
 
 

+ 93 - 0
coriolis/tasks/minion_pool_tasks.py

@@ -925,3 +925,96 @@ class HealthcheckDestinationMinionTask(_BaseHealthcheckMinionMachineTask):
     @classmethod
     def get_required_platform(cls):
         return constants.TASK_PLATFORM_DESTINATION
+
+
+class _BasePowerCycleMinionTask(base.TaskRunner):
+
+    @classmethod
+    def get_required_platform(cls):
+        raise NotImplementedError(
+            "No minion power cycle platform specified")
+
+    @classmethod
+    def get_required_provider_types(cls):
+        return _get_required_minion_pool_provider_types_for_platform(
+            cls.get_required_platform())
+
+    @classmethod
+    def get_required_task_info_properties(cls):
+        return ["minion_provider_properties"]
+
+    @classmethod
+    def get_returned_task_info_properties(cls):
+        return []
+
+    @classmethod
+    def _get_minion_power_cycle_op(cls, provider):
+        raise NotImplementedError(
+            "No minion power cycle operation implemented.")
+
+    def _run(self, ctxt, instance, origin, destination,
+             task_info, event_handler):
+
+        platform_to_target = None
+        required_platform = self.get_required_platform()
+        if required_platform == constants.TASK_PLATFORM_SOURCE:
+            platform_to_target = origin
+        elif required_platform == constants.TASK_PLATFORM_DESTINATION:
+            platform_to_target = destination
+        else:
+            raise NotImplementedError(
+                "Unknown minion healthcheck platform '%s'" % (
+                    required_platform))
+
+        connection_info = base.get_connection_info(ctxt, platform_to_target)
+        provider_type = self.get_required_provider_types()[
+            self.get_required_platform()][0]
+        provider = providers_factory.get_provider(
+            platform_to_target["type"], provider_type, event_handler)
+        power_cycle_op = self._get_minion_power_cycle_op(provider)
+        minion_properties = task_info['minion_provider_properties']
+        power_cycle_op(ctxt, connection_info, minion_properties)
+
+        return {}
+
+
+class _BasePowerOnMinionTask(_BasePowerCycleMinionTask):
+
+    @classmethod
+    def _get_minion_power_cycle_op(cls, provider):
+        return provider.start_minion
+
+
+class PowerOnSourceMinionTask(_BasePowerOnMinionTask):
+
+    @classmethod
+    def get_required_platform(cls):
+        return constants.TASK_PLATFORM_SOURCE
+
+
+class PowerOnDestinationMinionTask(_BasePowerOnMinionTask):
+
+    @classmethod
+    def get_required_platform(cls):
+        return constants.TASK_PLATFORM_DESTINATION
+
+
+class _BasePowerOffMinionTask(_BasePowerCycleMinionTask):
+
+    @classmethod
+    def _get_minion_power_cycle_op(cls, provider):
+        return provider.shutdown_minion
+
+
+class PowerOffSourceMinionTask(_BasePowerOffMinionTask):
+
+    @classmethod
+    def get_required_platform(cls):
+        return constants.TASK_PLATFORM_SOURCE
+
+
+class PowerOffDestinationMinionTask(_BasePowerOffMinionTask):
+
+    @classmethod
+    def get_required_platform(cls):
+        return constants.TASK_PLATFORM_DESTINATION