Procházet zdrojové kódy

Add minion pool healthchecks.

Nashwan Azhari před 5 roky
rodič
revize
3671ad741a

+ 15 - 0
coriolis/api/v1/minion_pool_actions.py

@@ -30,6 +30,21 @@ class MinionPoolActionsController(api_wsgi.Controller):
         except exception.InvalidParameterValue as ex:
             raise exc.HTTPNotFound(explanation=ex.msg)
 
+    @api_wsgi.action('healthcheck')
+    def _healthcheck_pool(self, req, id, body):
+        context = req.environ['coriolis.context']
+        context.can(
+            minion_pool_policies.get_minion_pools_policy_label(
+                "healthcheck"))
+        try:
+            return minion_pool_view.single(
+                req, self.minion_pool_api.healthcheck_minion_pool(
+                    context, id))
+        except exception.NotFound as ex:
+            raise exc.HTTPNotFound(explanation=ex.msg)
+        except exception.InvalidParameterValue as ex:
+            raise exc.HTTPNotFound(explanation=ex.msg)
+
     @api_wsgi.action('deallocate')
     def _deallocate_pool(self, req, id, body):
         context = req.environ['coriolis.context']

+ 7 - 1
coriolis/constants.py

@@ -171,6 +171,8 @@ TASK_TYPE_RELEASE_SOURCE_MINION = "RELEASE_SOURCE_MINION"
 TASK_TYPE_RELEASE_DESTINATION_MINION = "RELEASE_DESTINATION_MINION"
 TASK_TYPE_RELEASE_OSMORPHING_MINION = "RELEASE_OSMORPHING_MINION"
 TASK_TYPE_COLLECT_OSMORPHING_INFO = "COLLECT_OS_MORPHING_INFO"
+TASK_TYPE_HEALTHCHECK_SOURCE_MINION = "HEALTHCHECK_SOURCE_MINION"
+TASK_TYPE_HEALTHCHECK_DESTINATION_MINION = "HEALTHCHECK_DESTINATION_MINION"
 
 MINION_POOL_OPERATIONS_TASKS = [
     TASK_TYPE_VALIDATE_SOURCE_MINION_POOL_OPTIONS,
@@ -180,7 +182,10 @@ MINION_POOL_OPERATIONS_TASKS = [
     TASK_TYPE_CREATE_SOURCE_MINION_MACHINE,
     TASK_TYPE_CREATE_DESTINATION_MINION_MACHINE,
     TASK_TYPE_TEAR_DOWN_SOURCE_POOL_SHARED_RESOURCES,
-    TASK_TYPE_TEAR_DOWN_DESTINATION_POOL_SHARED_RESOURCES]
+    TASK_TYPE_TEAR_DOWN_DESTINATION_POOL_SHARED_RESOURCES,
+    TASK_TYPE_HEALTHCHECK_SOURCE_MINION,
+    TASK_TYPE_HEALTHCHECK_DESTINATION_MINION
+]
 
 TASK_PLATFORM_SOURCE = "source"
 TASK_PLATFORM_DESTINATION = "destination"
@@ -355,3 +360,4 @@ MINION_MACHINE_STATUS_UNINITIALIZED = "UNINITIALIZED"
 MINION_MACHINE_STATUS_RECONFIGURING = "RECONFIGURING"
 MINION_MACHINE_STATUS_AVAILABLE = "AVAILABLE"
 MINION_MACHINE_STATUS_ALLOCATED = "ALLOCATED"
+MINION_MACHINE_STATUS_RESERVED = "RESERVED"

+ 3 - 1
coriolis/db/api.py

@@ -1209,7 +1209,8 @@ def update_minion_machine(context, minion_machine_id, updated_values):
 
     updateable_fields = [
         "connection_info", "provider_properties", "status",
-        "backup_writer_connection_info", "allocated_action"]
+        "backup_writer_connection_info", "allocated_action",
+        "allocated_at"]
     _update_sqlalchemy_object_fields(
         minion_machine, updateable_fields, updated_values)
 
@@ -1236,6 +1237,7 @@ def set_minion_machines_allocation_statuses(
                 machine.id, machine.status, allocation_status,
                 machine.allocated_action, action_id))
         machine.allocated_action = action_id
+        machine.allocated_at = timeutils.utcnow()
         machine.status = allocation_status
 
 

+ 2 - 0
coriolis/db/sqlalchemy/migrate_repo/versions/016_adds_minion_vm_pools.py

@@ -86,6 +86,8 @@ def upgrade(migrate_engine):
                 nullable=False),
             sqlalchemy.Column(
                 'allocated_action', sqlalchemy.String(36), nullable=True),
+            sqlalchemy.Column(
+                'allocated_at', sqlalchemy.DateTime, nullable=True),
             sqlalchemy.Column(
                 'status', sqlalchemy.String(255), nullable=False,
                 default=lambda: "UNKNOWN"),

+ 4 - 0
coriolis/db/sqlalchemy/models.py

@@ -490,6 +490,9 @@ class MinionMachine(BASE, models.TimestampMixin, models.ModelBase,
     allocated_action = sqlalchemy.Column(
         sqlalchemy.String(36), nullable=True)
 
+    allocated_at = sqlalchemy.Column(
+        sqlalchemy.types.DateTime, nullable=True)
+
     connection_info = sqlalchemy.Column(
         types.Json, nullable=True)
 
@@ -512,6 +515,7 @@ class MinionMachine(BASE, models.TimestampMixin, models.ModelBase,
             "status": self.status,
             "connection_info": self.connection_info,
             "allocated_action": self.allocated_action,
+            "allocated_at": self.allocated_at,
             "backup_writer_connection_info": (
                 self.backup_writer_connection_info),
             "provider_properties": self.provider_properties

+ 5 - 0
coriolis/minion_manager/rpc/client.py

@@ -100,6 +100,11 @@ class MinionManagerClient(object):
             ctxt, "allocate_minion_pool",
             minion_pool_id=minion_pool_id)
 
+    def healthcheck_minion_pool(self, ctxt, minion_pool_id):
+        return self._client.call(
+            ctxt, "healthcheck_minion_pool",
+            minion_pool_id=minion_pool_id)
+
     def deallocate_minion_pool(
             self, ctxt, minion_pool_id, force=False):
         return self._client.call(

+ 97 - 20
coriolis/minion_manager/rpc/server.py

@@ -8,6 +8,7 @@ import uuid
 from oslo_concurrency import lockutils
 from oslo_config import cfg
 from oslo_log import log as logging
+from taskflow.patterns import graph_flow
 from taskflow.patterns import linear_flow
 from taskflow.patterns import unordered_flow
 
@@ -559,6 +560,92 @@ class MinionManagerServerEndpoint(object):
                         "No minion machines were found to be associated "
                         "with action with base_id '%s'.", action['base_id'])
 
+    def _get_minion_pool_healthcheck_flow(
+            self, ctxt, minion_pool, requery=True):
+
+        if requery:
+            minion_pool = self._get_minion_pool(
+                ctxt, minion_pool.id, include_machines=True,
+                include_progress_updates=False, include_events=False)
+
+        pool_healthcheck_flow = unordered_flow.Flow(
+            minion_manager_tasks.MINION_POOL_HEALTHCHECK_FLOW_NAME_FORMAT % (
+                minion_pool.id))
+
+        for machine in minion_pool.minion_machines:
+            # define healthcheck subflow for each machine:
+            machine_healthcheck_subflow = graph_flow.Flow(
+                minion_manager_tasks.MINION_POOL_HEALTHCHECK_MACHINE_SUBFLOW_NAME_FORMAT % (
+                    minion_pool.id, machine.id))
+
+            # add healtcheck task to healthcheck subflow:
+            machine_healthcheck_task = (
+                minion_manager_tasks.HealthcheckMinionMachineTask(
+                    minion_pool.id, machine.id, minion_pool.platform))
+            machine_healthcheck_subflow.add(machine_healthcheck_task)
+
+            # define reallocation subflow:
+            machine_reallocation_subflow = linear_flow.Flow(
+                minion_manager_tasks.MINION_POOL_REALLOCATE_MACHINE_SUBFLOW_NAME_FORMAT % (
+                    minion_pool.id, machine.id))
+            machine_reallocation_subflow.add(
+                minion_manager_tasks.DeallocateMinionMachineTask(
+                    minion_pool.id, machine.id, minion_pool.platform))
+            machine_reallocation_subflow.add(
+                minion_manager_tasks.AllocateMinionMachineTask(
+                    minion_pool.id, machine.id, minion_pool.platform))
+            machine_healthcheck_subflow.add(machine_reallocation_subflow)
+
+            # link reallocation subflow to healthcheck task:
+            healthcheck_cls = minion_manager_tasks.HealthcheckMinionMachineTask
+            machine_healthcheck_subflow.link(
+                machine_healthcheck_task, machine_reallocation_subflow,
+                decider=healthcheck_cls.make_minion_machine_healtcheck_decider(
+                    minion_pool.id, machine.id,
+                    on_successful_healthcheck=False))
+
+            # add the healthcheck subflow to the main flow:
+            pool_healthcheck_flow.add(machine_healthcheck_subflow)
+
+        return pool_healthcheck_flow
+
+    @minion_manager_utils.minion_pool_synchronized_op
+    def healthcheck_minion_pool(self, ctxt, minion_pool_id):
+        LOG.info("Attempting to healthcheck Minion Pool '%s'.", minion_pool_id)
+        minion_pool = self._get_minion_pool(
+            ctxt, minion_pool_id, include_events=False, include_machines=True,
+            include_progress_updates=False)
+        endpoint_dict = self._conductor_client.get_endpoint(
+            ctxt, minion_pool.endpoint_id)
+        acceptable_allocation_statuses = [
+            constants.MINION_POOL_STATUS_ALLOCATED]
+        current_status = minion_pool.status
+        if current_status not in acceptable_allocation_statuses:
+            raise exception.InvalidMinionPoolState(
+                "Minion machines for pool '%s' cannot be healthchecked as the "
+                "pool is in '%s' state instead of the expected %s." % (
+                    minion_pool_id, current_status,
+                    acceptable_allocation_statuses))
+
+        healthcheck_flow = self._get_minion_pool_healthcheck_flow(
+            ctxt, minion_pool, requery=False)
+        initial_store = self._get_pool_initial_taskflow_store_base(
+            ctxt, minion_pool, endpoint_dict)
+
+        try:
+            db_api.set_minion_pool_status(
+                ctxt, minion_pool_id,
+                constants.MINION_POOL_STATUS_POOL_MAINTENANCE)
+            self._taskflow_runner.run_flow_in_background(
+                healthcheck_flow, store=initial_store)
+        except:
+            db_api.set_minion_pool_status(
+                ctxt, minion_pool_id, current_status)
+            raise
+
+        return self._get_minion_pool(ctxt, minion_pool.id)
+
+
     def _get_minion_pool_allocation_flow(self, minion_pool):
         """ Returns a taskflow.Flow object pertaining to all the tasks
         required for allocating a minion pool (validation, shared resource
@@ -656,14 +743,14 @@ class MinionManagerServerEndpoint(object):
             allocation_flow = self._get_minion_pool_allocation_flow(
                 minion_pool)
             # start the deployment flow:
-            initial_store = self._get_pool_allocation_initial_store(
+            initial_store = self._get_pool_initial_taskflow_store_base(
                 ctxt, minion_pool, endpoint_dict)
             self._taskflow_runner.run_flow_in_background(
                 allocation_flow, store=initial_store)
 
         return self.get_minion_pool(ctxt, minion_pool.id)
 
-    def _get_pool_allocation_initial_store(
+    def _get_pool_initial_taskflow_store_base(
             self, ctxt, minion_pool, endpoint_dict):
         # NOTE: considering pools are associated to strictly one endpoint,
         # we can duplicate the 'origin/destination':
@@ -726,7 +813,7 @@ class MinionManagerServerEndpoint(object):
                     acceptable_allocation_statuses))
 
         allocation_flow = self._get_minion_pool_allocation_flow(minion_pool)
-        initial_store = self._get_pool_allocation_initial_store(
+        initial_store = self._get_pool_initial_taskflow_store_base(
             ctxt, minion_pool, endpoint_dict)
 
         try:
@@ -790,23 +877,13 @@ class MinionManagerServerEndpoint(object):
 
     def _get_pool_deallocation_initial_store(
             self, ctxt, minion_pool, endpoint_dict):
-        # NOTE: considering pools are associated to strictly one endpoint,
-        # we can duplicate the 'origin/destination':
-        origin_dest_info = {
-            "id": endpoint_dict['id'],
-            "connection_info": endpoint_dict['connection_info'],
-            "mapped_regions": endpoint_dict['mapped_regions'],
-            "type": endpoint_dict['type']}
-        initial_store = {
-            "context": ctxt,
-            "origin": origin_dest_info,
-            "destination": origin_dest_info,
-            "task_info": {
-                "pool_identifier": minion_pool.id,
-                "pool_os_type": minion_pool.os_type,
-                "pool_environment_options": minion_pool.environment_options,
-                "pool_shared_resources": minion_pool.shared_resources}}
-        return initial_store
+        base = self._get_pool_initial_taskflow_store_base(
+            ctxt, minion_pool, endpoint_dict)
+        if 'task_info' not in base:
+            base['task_info'] = {}
+        base['task_info']['pool_shared_resources'] = (
+            minion_pool.shared_resources)
+        return base
 
     @minion_manager_utils.minion_pool_synchronized_op
     def deallocate_minion_pool(self, ctxt, minion_pool_id, force=False):

+ 120 - 1
coriolis/minion_manager/rpc/tasks.py

@@ -5,9 +5,11 @@ import abc
 import copy
 
 from oslo_log import log as logging
+from oslo_utils import timeutils
 
 from coriolis import constants
 from coriolis import exception
+from coriolis import utils
 from coriolis.db import api as db_api
 from coriolis.db.sqlalchemy import models
 from coriolis.minion_manager.rpc import utils as minion_manager_utils
@@ -18,8 +20,11 @@ LOG = logging.getLogger(__name__)
 
 MINION_POOL_ALLOCATION_FLOW_NAME_FORMAT = "pool-%s-allocation"
 MINION_POOL_DEALLOCATION_FLOW_NAME_FORMAT = "pool-%s-deallocation"
+MINION_POOL_HEALTHCHECK_FLOW_NAME_FORMAT = "pool-%s-healthcheck"
 MINION_POOL_VALIDATION_TASK_NAME_FORMAT = "pool-%s-validation"
 MINION_POOL_UPDATE_STATUS_TASK_NAME_FORMAT = "pool-%s-update-status-%s"
+MINION_POOL_HEALTHCHECK_MACHINE_TASK_NAME_FORMAT = (
+    "pool-%s-machine-%s-healthcheck")
 MINION_POOL_ALLOCATE_SHARED_RESOURCES_TASK_NAME_FORMAT = (
     "pool-%s-allocate-shared-resources")
 MINION_POOL_DEALLOCATE_SHARED_RESOURCES_TASK_NAME_FORMAT = (
@@ -28,6 +33,10 @@ MINION_POOL_ALLOCATE_MINIONS_SUBFLOW_NAME_FORMAT = (
     "pool-%s-machines-allocation")
 MINION_POOL_DEALLOCATE_MACHINES_SUBFLOW_NAME_FORMAT = (
     "pool-%s-machines-deallocation")
+MINION_POOL_HEALTHCHECK_MACHINE_SUBFLOW_NAME_FORMAT = (
+    "pool-%s-machine-%s-healthcheck")
+MINION_POOL_REALLOCATE_MACHINE_SUBFLOW_NAME_FORMAT = (
+    "pool-%s-machine-%s-reallocation")
 MINION_POOL_ALLOCATE_MACHINE_TASK_NAME_FORMAT = (
     "pool-%s-machine-%s-allocation")
 MINION_POOL_DEALLOCATE_MACHINE_TASK_NAME_FORMAT = (
@@ -345,7 +354,7 @@ class AllocateMinionMachineTask(BaseMinionManangerTask):
 
     def __init__(
             self, minion_pool_id, minion_machine_id, minion_pool_type,
-            **kwargs):
+            allocate_to_action=None, **kwargs):
         resource_deployment_task_type = (
             constants.TASK_TYPE_CREATE_SOURCE_MINION_MACHINE)
         resource_cleanup_task_type = (
@@ -355,6 +364,7 @@ class AllocateMinionMachineTask(BaseMinionManangerTask):
                 constants.TASK_TYPE_CREATE_DESTINATION_MINION_MACHINE)
             resource_cleanup_task_type = (
                 constants.TASK_TYPE_DELETE_DESTINATION_MINION_MACHINE)
+        self._allocate_to_action = allocate_to_action
         super(AllocateMinionMachineTask, self).__init__(
             minion_pool_id, minion_machine_id, resource_deployment_task_type,
             cleanup_task_runner_type=resource_cleanup_task_type, **kwargs)
@@ -397,11 +407,16 @@ class AllocateMinionMachineTask(BaseMinionManangerTask):
             "ID '%s'" % (self._minion_machine_id))
 
         updated_values = {
+            "allocated_at": timeutils.utcnow(),
             "status": constants.MINION_MACHINE_STATUS_AVAILABLE,
             "connection_info": res['minion_connection_info'],
             "provider_properties": res['minion_provider_properties'],
             "backup_writer_connection_info": res[
                 "minion_backup_writer_connection_info"]}
+        if self._allocate_to_action:
+            updated_values["allocated_action"] = self._allocate_to_action
+            updated_values["status"] = (
+                constants.MINION_MACHINE_STATUS_RESERVED)
         db_api.update_minion_machine(
             context, self._minion_machine_id, updated_values)
 
@@ -500,3 +515,107 @@ class DeallocateMinionMachineTask(BaseMinionManangerTask):
             "ID '%s'" % (self._minion_machine_id))
 
         return task_info
+
+
+class HealthcheckMinionMachineTask(BaseMinionManangerTask):
+    """ Task which healthchecks the given minion machine. """
+
+    def __init__(
+            self, minion_pool_id, minion_machine_id, minion_pool_type,
+            **kwargs):
+        resource_healthcheck_task = (
+            constants.TASK_TYPE_HEALTHCHECK_SOURCE_MINION)
+        if minion_pool_type != constants.PROVIDER_PLATFORM_SOURCE:
+            resource_deletion_task_type = (
+                constants.TASK_TYPE_HEALTHCHECK_DESTINATION_MINION)
+        super(HealthcheckMinionMachineTask, self).__init__(
+            minion_pool_id, minion_machine_id, resource_deletion_task_type,
+            **kwargs)
+
+    def execute(self, context, origin, destination, task_info):
+        res = {
+            "healthy": True,
+            "error": None}
+
+        machine = self._get_minion_machine(context, self._minion_machine_id)
+        if not machine:
+            LOG.info(
+                "[Task '%s'] Could not find machine with ID '%s' in the DB. "
+                "Presuming it was already deleted so healthcheck failed.",
+                self._task_name, self._minion_machine_id)
+            return {
+                "healthy": False,
+                "error": "Machine not found."}
+
+        self._add_minion_pool_event(
+            context,
+            "Healthchecking  minion machine with internal pool ID '%s'" % (
+                self._minion_machine_id))
+
+        execution_info = {
+            "minion_provider_properties": machine.provider_properties,
+            "minion_connection_info": machine.connection_info}
+        try:
+            _ = super(HealthcheckMinionMachineTask, self).execute(
+                context, origin, destination, execution_info)
+            self._add_minion_pool_event(
+                context,
+                "Successfully healtchecked minion machine with internal "
+                "pool ID '%s'" % self._minion_machine_id)
+        except Exception as ex:
+            self._add_minion_pool_event(
+                context,
+                "Healtcheck for machine '%s' has failed." % (
+                    self._minion_machine_id),
+                level=constants.TASK_EVENT_WARNING)
+            LOG.debug(
+                "[Task '%s'] Healtcheck failed for machine '%s' of pool '%s'. "
+                "Full trace was:\n%s", self._task_name,
+                self._minion_machine_id, self._minion_pool_id,
+                utils.get_exception_details())
+            res = {
+                "healthy": False,
+                "error": str(ex)}
+
+        return res
+
+    def _get_task_name(self, minion_pool_id, minion_machine_id):
+        return self.get_healthcheck_task_name(
+            minion_pool_id, minion_machine_id)
+
+    @classmethod
+    def get_healthcheck_task_name(cls, minion_pool_id, minion_machine_id):
+        return MINION_POOL_HEALTHCHECK_MACHINE_TASK_NAME_FORMAT % (
+            minion_pool_id, minion_machine_id)
+
+    @classmethod
+    def make_minion_machine_healtcheck_decider(
+            cls, minion_pool_id, minion_machine_id,
+            on_successful_healthcheck=True):
+        def _healthcheck_decider(history):
+            healthcheck_task_name = (
+                cls.get_healthcheck_task_name(
+                    minion_pool_id, minion_machine_id))
+
+            if not history and healthcheck_task_name not in history:
+                LOG.warn(
+                    "Could not find healthceck result for minion machine '%s' "
+                    "of pool '%s' (task name '%s'). NOT grennlighting futher "
+                    "tasks.", minion_machine_id, minion_pool_id,
+                    healthcheck_task_name)
+
+            healtcheck_result = history[healthcheck_task_name]
+            if healtcheck_result['healthy']:
+                LOG.debug(
+                    "Healtcheck task '%s' confirmed worker health. Decider "
+                    "returning %s", healthcheck_task_name,
+                    on_successful_healthcheck)
+                return on_successful_healthcheck
+            else:
+                LOG.debug(
+                    "Healtcheck task '%s' denied worker health. Decider "
+                    "returning %s", healthcheck_task_name,
+                    not on_successful_healthcheck)
+                return not on_successful_healthcheck
+
+        return _healthcheck_decider

+ 4 - 0
coriolis/minion_pools/api.py

@@ -37,6 +37,10 @@ class API(object):
         return self._rpc_client.allocate_minion_pool(
             ctxt, minion_pool_id)
 
+    def healthcheck_minion_pool(self, ctxt, minion_pool_id):
+        return self._rpc_client.healthcheck_minion_pool(
+            ctxt, minion_pool_id)
+
     def deallocate_minion_pool(self, ctxt, minion_pool_id, force=False):
         return self._rpc_client.deallocate_minion_pool(
             ctxt, minion_pool_id, force=force)

+ 2 - 16
coriolis/osmorphing/osmount/windows.py

@@ -20,26 +20,12 @@ class WindowsMountTools(base.BaseOSMountTools):
 
         host = connection_info["ip"]
         port = connection_info.get("port", 5986)
-        username = connection_info["username"]
-        password = connection_info.get("password")
-        cert_pem = connection_info.get("cert_pem")
-        cert_key_pem = connection_info.get("cert_key_pem")
-        url = "https://%s:%s/wsman" % (host, port)
-
-        LOG.info("Connection info: %s", str(connection_info))
-
-        LOG.info("Waiting for connectivity on host: %(host)s:%(port)s",
-                 {"host": host, "port": port})
-        utils.wait_for_port_connectivity(host, port)
-
         self._event_manager.progress_update(
             "Connecting to WinRM host: %(host)s:%(port)s" %
             {"host": host, "port": port})
 
-        conn = wsman.WSManConnection()
-        conn.connect(url=url, username=username, password=password,
-                     cert_pem=cert_pem, cert_key_pem=cert_key_pem)
-        self._conn = conn
+        self._conn = wsman.WSManConnection.from_connection_info(
+            connection_info)
 
     def get_connection(self):
         return self._conn

+ 11 - 0
coriolis/policies/minion_pools.py

@@ -83,6 +83,17 @@ MINION_POOLS_DEFAULT_RULES = [
             }
         ]
     ),
+    policy.DocumentedRuleDefault(
+        get_minion_pools_policy_label('healthcheck'),
+        MINION_POOLS_DEFAULT_RULE,
+        "Healthcheck Minion Pool",
+        [
+            {
+                "path": "/minion_pools/{minion_pool_id}/actions",
+                "method": "POST"
+            }
+        ]
+    ),
     policy.DocumentedRuleDefault(
         get_minion_pools_policy_label('deallocate'),
         MINION_POOLS_DEFAULT_RULE,

+ 6 - 0
coriolis/providers/base.py

@@ -614,6 +614,12 @@ class _BaseMinionPoolProvider(
             self, ctxt, connection_info, minion_properties, volumes_info):
         pass
 
+    @abc.abstractmethod
+    def healthcheck_minion(
+            self, ctxt, environment_options, connection_info,
+            minion_properties, minion_connection_info):
+        pass
+
 
 class BaseSourceMinionPoolProvider(_BaseMinionPoolProvider):
 

+ 17 - 4
coriolis/taskflow/base.py

@@ -106,12 +106,14 @@ class BaseRunWorkerTask(BaseCoriolisTaskflowTask):
 
     def __init__(
             self, task_name, task_id, task_instance, main_task_runner_type,
-            cleanup_task_runner_type=None, depends_on=None, **kwargs):
+            cleanup_task_runner_type=None, depends_on=None,
+            raise_on_cleanup_failure=False, **kwargs):
         self._task_id = task_id
         self._task_name = task_name
         self._task_instance = task_instance
         self._main_task_runner_type = main_task_runner_type
         self._cleanup_task_runner_type = cleanup_task_runner_type
+        self._raise_on_cleanup_failure = raise_on_cleanup_failure
 
         super(BaseRunWorkerTask, self).__init__(name=task_name, **kwargs)
 
@@ -233,9 +235,20 @@ class BaseRunWorkerTask(BaseCoriolisTaskflowTask):
                 self._task_name, self._main_task_runner_type)
             return original_result
 
-        res = self._execute_task(
-            context, self._task_id, self._cleanup_task_runner_type, origin,
-            destination, task_info)
+        try:
+            res = self._execute_task(
+                context, self._task_id, self._cleanup_task_runner_type, origin,
+                destination, task_info)
+        except Exception as ex:
+            LOG.warn(
+                "Task cleanup for '%s' (main task type '%s', cleanup task type"
+                "'%s') has failed with the following trace: %s",
+                self._task_name, self._main_task_runner_type,
+                self._cleanup_task_runner_type, utils.get_exception_details())
+            if self._raise_on_cleanup_failure:
+                raise
+            return original_result
+
         LOG.debug(
             "Reversion of taskflow task '%s' (ID '%s') was successfully "
             "executed using task runner '%s' with the following result: %s" % (

+ 5 - 1
coriolis/tasks/factory.py

@@ -128,7 +128,11 @@ _TASKS_MAP = {
     constants.TASK_TYPE_RELEASE_OSMORPHING_MINION:
         minion_pool_tasks.ReleaseOSMorphingMinionTask,
     constants.TASK_TYPE_COLLECT_OSMORPHING_INFO:
-        minion_pool_tasks.CollectOSMorphingInfoTask
+        minion_pool_tasks.CollectOSMorphingInfoTask,
+    constants.TASK_TYPE_HEALTHCHECK_SOURCE_MINION:
+        minion_pool_tasks.HealthcheckSourceMinionMachineTask,
+    constants.TASK_TYPE_HEALTHCHECK_DESTINATION_MINION:
+        minion_pool_tasks.HealthcheckDestinationMinionTask
 }
 
 

+ 65 - 0
coriolis/tasks/minion_pool_tasks.py

@@ -865,3 +865,68 @@ class CollectOSMorphingInfoTask(base.TaskRunner):
 
         return {
             "osmorphing_info": result["osmorphing_info"]}
+
+
+class _BaseHealthcheckMinionMachineTask(base.TaskRunner):
+    """ Calls into the provider to healthcheck the minion machine. """
+
+    @classmethod
+    def get_required_platform(cls):
+        raise NotImplementedError(
+            "No minion healthcheck platform specified")
+
+    @classmethod
+    def get_required_task_info_properties(cls):
+        return ["minion_provider_properties", "minion_connection_info"]
+
+    @classmethod
+    def get_returned_task_info_properties(cls):
+        return []
+
+    @classmethod
+    def get_required_provider_types(cls):
+        return _get_required_minion_pool_provider_types_for_platform(
+            cls.get_required_platform())
+
+    def _run(self, ctxt, instance, origin, destination,
+             task_info, event_handler):
+
+        platform_to_target = None
+        required_platform = self.get_required_platform()
+        if required_platform == constants.TASK_PLATFORM_SOURCE:
+            platform_to_target = origin
+        elif required_platform == constants.TASK_PLATFORM_DESTINATION:
+            platform_to_target = destination
+        else:
+            raise NotImplementedError(
+                "Unknown minion healthcheck platform '%s'" % (
+                    required_platform))
+
+        connection_info = base.get_connection_info(ctxt, platform_to_target)
+        provider_type = self.get_required_provider_types()[
+            self.get_required_platform()][0]
+        provider = providers_factory.get_provider(
+            platform_to_target["type"], provider_type, event_handler)
+
+        minion_properties = task_info['minion_provider_properties']
+        minion_connection_info = base.unmarshal_migr_conn_info(
+            task_info['minion_connection_info'])
+
+        provider.healthcheck_minion(
+            ctxt, connection_info, minion_properties, minion_connection_info)
+
+        return task_info
+
+
+class HealthcheckSourceMinionMachineTask(_BaseHealthcheckMinionMachineTask):
+
+    @classmethod
+    def get_required_platform(cls):
+        return constants.TASK_PLATFORM_SOURCE
+
+
+class HealthcheckDestinationMinionTask(_BaseHealthcheckMinionMachineTask):
+
+    @classmethod
+    def get_required_platform(cls):
+        return constants.TASK_PLATFORM_DESTINATION

+ 1 - 3
coriolis/utils.py

@@ -11,7 +11,6 @@ import io
 import json
 import os
 import pickle
-import platform
 import re
 import socket
 import string
@@ -20,8 +19,6 @@ import sys
 import time
 import traceback
 import uuid
-import __main__ as main
-
 from io import StringIO
 
 import OpenSSL
@@ -29,6 +26,7 @@ from oslo_config import cfg
 from oslo_log import log as logging
 from oslo_serialization import jsonutils
 
+import __main__ as main
 import netifaces
 import paramiko
 # NOTE(gsamfira): I am aware that this is not ideal, but pip

+ 35 - 0
coriolis/wsman.py

@@ -48,6 +48,41 @@ class WSManConnection(object):
             cert_pem=cert_pem,
             cert_key_pem=cert_key_pem)
 
+    @classmethod
+    def from_connection_info(cls, connection_info):
+        """ Returns a wsman.WSManConnection object for the provided conn info. """
+        if not isinstance(connection_info, dict):
+            raise ValueError(
+                "WSMan connection must be a dict. Got type '%s', value: %s" % (
+                    type(connection_info), connection_info))
+
+        required_keys = ["ip", "username", "password"]
+        missing = [key for key in required_keys if key not in connection_info]
+        if missing:
+            raise ValueError(
+                "The following keys were missing from WSMan connection info %s. "
+                "Got: %s" % (missing, connection_info))
+
+        host = connection_info["ip"]
+        port = connection_info.get("port", 5986)
+        username = connection_info["username"]
+        password = connection_info.get("password")
+        cert_pem = connection_info.get("cert_pem")
+        cert_key_pem = connection_info.get("cert_key_pem")
+        url = "https://%s:%s/wsman" % (host, port)
+
+        LOG.info("Connection info: %s", str(connection_info))
+
+        LOG.info("Waiting for connectivity on host: %(host)s:%(port)s",
+                 {"host": host, "port": port})
+        utils.wait_for_port_connectivity(host, port)
+
+        conn = cls()
+        conn.connect(url=url, username=username, password=password,
+                     cert_pem=cert_pem, cert_key_pem=cert_key_pem)
+
+        return conn
+
     def disconnect(self):
         self._protocol = None