Explorar el Código

Add periodic pool refreshing.

Nashwan Azhari hace 5 años
padre
commit
f5730aad5d

+ 2 - 2
coriolis/api/v1/minion_pool_actions.py

@@ -30,7 +30,7 @@ class MinionPoolActionsController(api_wsgi.Controller):
         except exception.InvalidParameterValue as ex:
             raise exc.HTTPNotFound(explanation=ex.msg)
 
-    @api_wsgi.action('healthcheck')
+    @api_wsgi.action('refresh')
     def _healthcheck_pool(self, req, id, body):
         context = req.environ['coriolis.context']
         context.can(
@@ -38,7 +38,7 @@ class MinionPoolActionsController(api_wsgi.Controller):
                 "healthcheck"))
         try:
             return minion_pool_view.single(
-                req, self.minion_pool_api.healthcheck_minion_pool(
+                req, self.minion_pool_api.refresh_minion_pool(
                     context, id))
         except exception.NotFound as ex:
             raise exc.HTTPNotFound(explanation=ex.msg)

+ 4 - 3
coriolis/constants.py

@@ -359,10 +359,11 @@ ACTIVE_MINION_POOL_STATUSES = [
 
 MINION_MACHINE_IDENTIFIER_FORMAT = "coriolis-pool-%(pool_id)s-minion-%(minion_id)s"
 MINION_MACHINE_STATUS_UNINITIALIZED = "UNINITIALIZED"
-MINION_MACHINE_STATUS_DEPLOYING = "DEPLOYING"
+MINION_MACHINE_STATUS_HEALTHCHECKING = "HEALTHCHECKING"
+MINION_MACHINE_STATUS_ALLOCATING = "ALLOCATING"
+MINION_MACHINE_STATUS_DEALLOCATING = "DEALLOCATING"
 MINION_MACHINE_STATUS_ERROR = "ERROR"
 MINION_MACHINE_STATUS_ERROR_DEPLOYING = "ERROR_DEPLOYING"
-MINION_MACHINE_STATUS_RECONFIGURING = "RECONFIGURING"
 MINION_MACHINE_STATUS_AVAILABLE = "AVAILABLE"
-MINION_MACHINE_STATUS_ALLOCATED = "ALLOCATED"
+MINION_MACHINE_STATUS_IN_USE = "IN_USE"
 MINION_MACHINE_STATUS_RESERVED = "RESERVED"

+ 20 - 3
coriolis/db/api.py

@@ -1210,11 +1210,25 @@ def update_minion_machine(context, minion_machine_id, updated_values):
     updateable_fields = [
         "connection_info", "provider_properties", "status",
         "backup_writer_connection_info", "allocated_action",
-        "allocated_at"]
+        "last_used_at"]
     _update_sqlalchemy_object_fields(
         minion_machine, updateable_fields, updated_values)
 
 
+@enginefacade.writer
+def set_minion_machine_status(context, minion_machine_id, status):
+    machine = get_minion_machine(context, minion_machine_id)
+    if not machine:
+        raise exception.NotFound(
+            "Minion machine with ID '%s' not found" % minion_machine_id)
+    LOG.debug(
+        "Transitioning minion machine '%s' (pool '%s') from status '%s' to "
+        "'%s' in the DB",
+        minion_machine_id, machine.pool_id, machine.status, status)
+    machine.status = status
+    setattr(machine, 'updated_at', timeutils.utcnow())
+
+
 @enginefacade.writer
 def set_minion_machines_allocation_statuses(
         context, minion_machine_ids, action_id, allocation_status,
@@ -1239,7 +1253,7 @@ def set_minion_machines_allocation_statuses(
                 machine.allocated_action, action_id))
         machine.allocated_action = action_id
         if refresh_allocation_time:
-            machine.allocated_at = timeutils.utcnow()
+            machine.last_used_at = timeutils.utcnow()
         machine.status = allocation_status
 
 
@@ -1335,8 +1349,11 @@ def add_minion_pool_execution(context, execution):
 def set_minion_pool_status(context, minion_pool_id, status):
     pool = get_minion_pool(
         context, minion_pool_id, include_machines=False)
+    if not pool:
+        raise exception.NotFound(
+            "Minion pool '%s' not found" % minion_pool_id)
     LOG.debug(
-        "Transitioning minion pool '%s' from status '%s' to '%s'in DB",
+        "Transitioning minion pool '%s' from status '%s' to '%s' in DB",
         minion_pool_id, pool.status, status)
     pool.status = status
     setattr(pool, 'updated_at', timeutils.utcnow())

+ 1 - 1
coriolis/db/sqlalchemy/migrate_repo/versions/016_adds_minion_vm_pools.py

@@ -87,7 +87,7 @@ def upgrade(migrate_engine):
             sqlalchemy.Column(
                 'allocated_action', sqlalchemy.String(36), nullable=True),
             sqlalchemy.Column(
-                'allocated_at', sqlalchemy.DateTime, nullable=True),
+                'last_used_at', sqlalchemy.DateTime, nullable=True),
             sqlalchemy.Column(
                 'status', sqlalchemy.String(255), nullable=False,
                 default=lambda: "UNKNOWN"),

+ 3 - 3
coriolis/db/sqlalchemy/models.py

@@ -485,12 +485,12 @@ class MinionMachine(BASE, models.TimestampMixin, models.ModelBase,
 
     status = sqlalchemy.Column(
         sqlalchemy.String(255), nullable=False,
-        default=lambda: constants.MINION_MACHINE_STATUS_UNKNOWN)
+        default=lambda: constants.MINION_MACHINE_STATUS_UNINITIALIZED)
 
     allocated_action = sqlalchemy.Column(
         sqlalchemy.String(36), nullable=True)
 
-    allocated_at = sqlalchemy.Column(
+    last_used_at = sqlalchemy.Column(
         sqlalchemy.types.DateTime, nullable=True)
 
     connection_info = sqlalchemy.Column(
@@ -515,7 +515,7 @@ class MinionMachine(BASE, models.TimestampMixin, models.ModelBase,
             "status": self.status,
             "connection_info": self.connection_info,
             "allocated_action": self.allocated_action,
-            "allocated_at": self.allocated_at,
+            "last_used_at": self.last_used_at,
             "backup_writer_connection_info": (
                 self.backup_writer_connection_info),
             "provider_properties": self.provider_properties

+ 2 - 2
coriolis/minion_manager/rpc/client.py

@@ -111,9 +111,9 @@ class MinionManagerClient(object):
             ctxt, "allocate_minion_pool",
             minion_pool_id=minion_pool_id)
 
-    def healthcheck_minion_pool(self, ctxt, minion_pool_id):
+    def refresh_minion_pool(self, ctxt, minion_pool_id):
         return self._client.call(
-            ctxt, "healthcheck_minion_pool",
+            ctxt, "refresh_minion_pool",
             minion_pool_id=minion_pool_id)
 
     def deallocate_minion_pool(

+ 253 - 83
coriolis/minion_manager/rpc/server.py

@@ -1,22 +1,27 @@
 # Copyright 2020 Cloudbase Solutions Srl
 # All Rights Reserved.
 
-import contextlib
-import itertools
+import datetime
+import math
 import uuid
 
 from oslo_config import cfg
 from oslo_log import log as logging
+from oslo_utils import timeutils
+from taskflow import deciders as taskflow_deciders
 from taskflow.patterns import graph_flow
 from taskflow.patterns import linear_flow
 from taskflow.patterns import unordered_flow
 
 from coriolis import constants
+from coriolis import context
 from coriolis import exception
 from coriolis import utils
 from coriolis.conductor.rpc import client as rpc_conductor_client
+from coriolis.cron import cron
 from coriolis.db import api as db_api
 from coriolis.db.sqlalchemy import models
+from coriolis.minion_manager.rpc import client as rpc_minion_manager_client
 from coriolis.minion_manager.rpc import tasks as minion_manager_tasks
 from coriolis.minion_manager.rpc import utils as minion_manager_utils
 from coriolis.scheduler.rpc import client as rpc_scheduler_client
@@ -28,14 +33,90 @@ VERSION = "1.0"
 
 LOG = logging.getLogger(__name__)
 
-MINION_MANAGER_OPTS = []
+MINION_MANAGER_OPTS = [
+    cfg.IntOpt(
+        "minion_pool_default_refresh_period_minutes",
+        default=10,
+        help="Number of minutes in which to refresh minion pools.")]
 
 CONF = cfg.CONF
 CONF.register_opts(MINION_MANAGER_OPTS, 'minion_manager')
 
+MINION_POOL_REFRESH_CRON_JOB_NAME_FORMAT = "pool-%s-refresh-minute-%d"
+MINION_POOL_REFRESH_CRON_JOB_DESCRIPTION_FORMAT = (
+    "Regularly scheduled refresh job for minion pool '%s' on minute %d.")
+
+
+def _trigger_pool_refresh(ctxt, minion_manager_client, minion_pool_id):
+    try:
+        minion_manager_client.refresh_minion_pool(
+            ctxt, minion_pool_id)
+    except exception.InvalidMinionPoolState as ex:
+        LOG.warn(
+            "Minion Pool '%s' is in an invalid state for having a refresh run."
+            " Skipping for now. Error was: %s", minion_pool_id, str(ex))
+
 
 class MinionManagerServerEndpoint(object):
 
+    def __init__(self):
+        self._cron = cron.Cron()
+        self._admin_ctxt = context.get_admin_context()
+        # self._init_cron()
+
+    def _init_cron(self):
+        now = timeutils.utcnow()
+        minion_pools = db_api.get_minion_pools(
+            self._admin_ctxt, include_machines=False,
+            include_progress_updates=False, include_events=False)
+        for minion_pool in minion_pools:
+            active_pool_statuses = [constants.MINION_POOL_STATUS_ALLOCATED]
+            if minion_pool.status not in active_pool_statuses:
+                LOG.debug(
+                    "Not setting any refresh schedules for minion pool '%s' "
+                    "as it is in an inactive status '%s'.",
+                    minion_pool.id, minion_pool.status)
+                continue
+            LOG.debug(
+                "Adding refresh schedule for minion pool '%s' as part of "
+                "server startup.", minion_pool.id)
+            self._register_refresh_jobs_for_minion_pool(minion_pool, date=now)
+
+    def _register_refresh_jobs_for_minion_pool(
+            self, minion_pool, date=None, period_minutes=None):
+        if not period_minutes:
+            period_minutes = CONF.minion_manager.minion_pool_default_refresh_period_minutes
+        if period_minutes <= 0:
+            LOG.warn(
+                "Got zero or negative pool refresh period %s. Defaulting to "
+                "1.", period_minutes)
+            period_minutes = 1
+        if period_minutes > 60:
+            LOG.warn(
+                "Selected pool refresh period_minutes is greater than 60, defaulting "
+                "to 10. Original value was: %s", period_minutes)
+            period_minutes = 10
+        if not date:
+            date = timeutils.utcnow()
+        admin_ctxt = context.get_admin_context()
+        description = (
+            "Scheduled refresh job for minion pool '%s'" % minion_pool.id)
+
+        # NOTE: we need to generate hourly schedules for each minute in
+        # the hour we would like the refresh to be triggered:
+        for minute in [
+                period_minutes * i for i in range(
+                    math.ceil(60 / period_minutes))]:
+            name = MINION_POOL_REFRESH_CRON_JOB_NAME_FORMAT % (
+                minion_pool.id, minute)
+            description = MINION_POOL_REFRESH_CRON_JOB_DESCRIPTION_FORMAT % (
+                minion_pool.id, minute)
+            self._cron.register(
+                cron.CronJob(
+                    name, description, {"minute": minute}, True, None, None,
+                    None, _trigger_pool_refresh, admin_ctxt,
+                    self._rpc_minion_manager_client, minion_pool.id))
+
     @property
     def _taskflow_runner(self):
         return taskflow_runner.TaskFlowRunner(
@@ -52,21 +133,25 @@ class MinionManagerServerEndpoint(object):
         return rpc_worker_client.WorkerClient()
 
     @property
-    def _scheduler_client(self):
+    def _rpc_scheduler_client(self):
         return rpc_scheduler_client.SchedulerClient()
 
     @property
-    def _conductor_client(self):
+    def _rpc_conductor_client(self):
         return rpc_conductor_client.ConductorClient()
 
+    @property
+    def _rpc_minion_manager_client(self):
+        return rpc_minion_manager_client.MinionManagerClient()
+
     def get_diagnostics(self, ctxt):
         return utils.get_diagnostics_info()
 
     def get_endpoint_source_minion_pool_options(
             self, ctxt, endpoint_id, env, option_names):
-        endpoint = self._conductor_client.get_endpoint(ctxt, endpoint_id)
+        endpoint = self._rpc_conductor_client.get_endpoint(ctxt, endpoint_id)
 
-        worker_service = self._scheduler_client.get_worker_service_for_specs(
+        worker_service = self._rpc_scheduler_client.get_worker_service_for_specs(
             ctxt, enabled=True,
             region_sets=[[reg['id'] for reg in endpoint['mapped_regions']]],
             provider_requirements={
@@ -81,9 +166,9 @@ class MinionManagerServerEndpoint(object):
 
     def get_endpoint_destination_minion_pool_options(
             self, ctxt, endpoint_id, env, option_names):
-        endpoint = self._conductor_client.get_endpoint(ctxt, endpoint_id)
+        endpoint = self._rpc_conductor_client.get_endpoint(ctxt, endpoint_id)
 
-        worker_service = self._scheduler_client.get_worker_service_for_specs(
+        worker_service = self._rpc_scheduler_client.get_worker_service_for_specs(
             ctxt, enabled=True,
             region_sets=[[reg['id'] for reg in endpoint['mapped_regions']]],
             provider_requirements={
@@ -98,9 +183,9 @@ class MinionManagerServerEndpoint(object):
 
     def validate_endpoint_source_minion_pool_options(
             self, ctxt, endpoint_id, pool_environment):
-        endpoint = self._conductor_client.get_endpoint(ctxt, endpoint_id)
+        endpoint = self._rpc_conductor_client.get_endpoint(ctxt, endpoint_id)
 
-        worker_service = self._scheduler_client.get_worker_service_for_specs(
+        worker_service = self._rpc_scheduler_client.get_worker_service_for_specs(
             ctxt, enabled=True,
             region_sets=[[reg['id'] for reg in endpoint['mapped_regions']]],
             provider_requirements={
@@ -114,9 +199,9 @@ class MinionManagerServerEndpoint(object):
 
     def validate_endpoint_destination_minion_pool_options(
             self, ctxt, endpoint_id, pool_environment):
-        endpoint = self._conductor_client.get_endpoint(ctxt, endpoint_id)
+        endpoint = self._rpc_conductor_client.get_endpoint(ctxt, endpoint_id)
 
-        worker_service = self._scheduler_client.get_worker_service_for_specs(
+        worker_service = self._rpc_scheduler_client.get_worker_service_for_specs(
             ctxt, enabled=True,
             region_sets=[[reg['id'] for reg in endpoint['mapped_regions']]],
             provider_requirements={
@@ -354,7 +439,7 @@ class MinionManagerServerEndpoint(object):
                 [constants.MINION_MACHINE_STATUS_UNINITIALIZED])
             self.deallocate_minion_machines_for_action(
                 ctxt, replica['id'])
-            self._conductor_client.report_replica_minions_allocation_error(
+            self._rpc_conductor_client.report_replica_minions_allocation_error(
                 ctxt, replica['id'], str(ex))
             raise
 
@@ -378,7 +463,7 @@ class MinionManagerServerEndpoint(object):
                 [constants.MINION_MACHINE_STATUS_UNINITIALIZED])
             self.deallocate_minion_machines_for_action(
                 ctxt, migration['id'])
-            self._conductor_client.report_migration_minions_allocation_error(
+            self._rpc_conductor_client.report_migration_minions_allocation_error(
                 ctxt, migration['id'], str(ex))
             raise
 
@@ -422,15 +507,16 @@ class MinionManagerServerEndpoint(object):
             for machine in minion_pool.minion_machines:
                 if exclude and machine.id in exclude:
                     LOG.debug(
-                        "Excluding minion machine '%s' from search.",
-                        machine.id)
+                        "Excluding minion machine '%s' from search for use "
+                        "action '%s'", machine.id, action_id)
                     continue
                 if machine.status != constants.MINION_MACHINE_STATUS_AVAILABLE:
                     LOG.debug(
                         "Minion machine with ID '%s' is in status '%s' "
-                        "instead of '%s'. Skipping.",
+                        "instead of the expected '%s'. Skipping for use "
+                        "with action '%s'.",
                         machine.id, machine.status,
-                        constants.MINION_MACHINE_STATUS_AVAILABLE)
+                        constants.MINION_MACHINE_STATUS_AVAILABLE, action_id)
                     continue
                 selected_machine = machine
                 break
@@ -454,11 +540,17 @@ class MinionManagerServerEndpoint(object):
                 # take note of the machine and setup a healthcheck:
                 instance_minion_allocations[instance] = minion_machine.id
                 existing_machines_to_allocate[minion_machine.id] = instance
+                LOG.debug(
+                    "Allocating pre-existing machine '%s' from pool '%s' for "
+                    "use with action with ID '%s'.",
+                    minion_machine.id, minion_pool.id, action_id)
                 allocation_subflow.add(
-                    minion_manager_tasks.HealthcheckMinionMachineTask(
-                        minion_pool.id, minion_machine.id,
-                        minion_pool.platform, fail_on_error=True,
-                        inject=inject_for_tasks))
+                    self._get_healtchcheck_flow_for_minion_machine(
+                        minion_pool, minion_machine.id,
+                        allocate_to_action=action_id,
+                        inject_for_tasks=inject_for_tasks,
+                        machine_status_on_success=(
+                            constants.MINION_MACHINE_STATUS_IN_USE)))
             else:
                 # add task which creates the new machine:
                 new_machine_id = str(uuid.uuid4())
@@ -493,7 +585,7 @@ class MinionManagerServerEndpoint(object):
                 minion_pool.id, action_id, existing_machines_to_allocate)
             db_api.set_minion_machines_allocation_statuses(
                 ctxt, list(existing_machines_to_allocate.keys()),
-                action_id, constants.MINION_MACHINE_STATUS_ALLOCATED,
+                action_id, constants.MINION_MACHINE_STATUS_IN_USE,
                 refresh_allocation_time=True)
 
             # add any new machine entries to the DB:
@@ -544,6 +636,10 @@ class MinionManagerServerEndpoint(object):
                     continue
             raise
 
+        LOG.debug(
+            "The following minion machine allocation from pool '%s' were or "
+            "will be made for action '%s': %s",
+            minion_pool.id, action_id, instance_minion_allocations)
         return {
             "flow": allocation_subflow,
             "action_instance_minion_allocation_mappings": (
@@ -623,7 +719,7 @@ class MinionManagerServerEndpoint(object):
                     ctxt, action['origin_minion_pool_id'],
                     include_machines=True, include_events=False,
                     include_progress_updates=False)
-                endpoint_dict = self._conductor_client.get_endpoint(
+                endpoint_dict = self._rpc_conductor_client.get_endpoint(
                     ctxt, minion_pool.endpoint_id)
                 origin_pool_store = self._get_pool_initial_taskflow_store_base(
                     ctxt, minion_pool, endpoint_dict)
@@ -657,7 +753,7 @@ class MinionManagerServerEndpoint(object):
                     ctxt, action['destination_minion_pool_id'],
                     include_machines=True, include_events=False,
                     include_progress_updates=False)
-                endpoint_dict = self._conductor_client.get_endpoint(
+                endpoint_dict = self._rpc_conductor_client.get_endpoint(
                     ctxt, minion_pool.endpoint_id)
                 destination_pool_store = (
                     self._get_pool_initial_taskflow_store_base(
@@ -723,7 +819,7 @@ class MinionManagerServerEndpoint(object):
                         ctxt, osmorphing_pool_id,
                         include_machines=True, include_events=False,
                         include_progress_updates=False)
-                    endpoint_dict = self._conductor_client.get_endpoint(
+                    endpoint_dict = self._rpc_conductor_client.get_endpoint(
                         ctxt, minion_pool.endpoint_id)
                     osmorphing_pool_store = self._get_pool_initial_taskflow_store_base(
                         ctxt, minion_pool, endpoint_dict)
@@ -824,7 +920,7 @@ class MinionManagerServerEndpoint(object):
                 minion_machine_id)
             return
 
-        machine_allocated_status = constants.MINION_MACHINE_STATUS_ALLOCATED
+        machine_allocated_status = constants.MINION_MACHINE_STATUS_IN_USE
         with minion_manager_utils.get_minion_pool_lock(
                 minion_machine.pool_id, external=True):
             if minion_machine.status != machine_allocated_status or (
@@ -910,7 +1006,57 @@ class MinionManagerServerEndpoint(object):
             "Successfully released all minion machines associated "
             "with action with base_id '%s'.", action_id)
 
-    def _get_minion_pool_healthcheck_flow(
+    def _get_healtchcheck_flow_for_minion_machine(
+            self, minion_pool, minion_machine_id, allocate_to_action=None,
+            machine_status_on_success=constants.MINION_MACHINE_STATUS_AVAILABLE,
+            inject_for_tasks=None):
+        """ Returns a taskflow graph flow with a healtcheck task
+        and redeployment subflow on error. """
+        # define healthcheck subflow for each machine:
+        machine_healthcheck_subflow = graph_flow.Flow(
+            minion_manager_tasks.MINION_POOL_HEALTHCHECK_MACHINE_SUBFLOW_NAME_FORMAT % (
+                minion_pool.id, minion_machine_id))
+
+        # add healtcheck task to healthcheck subflow:
+        machine_healthcheck_task = (
+            minion_manager_tasks.HealthcheckMinionMachineTask(
+                minion_pool.id, minion_machine_id, minion_pool.platform,
+                machine_status_on_success=machine_status_on_success,
+                fail_on_error=False, inject=inject_for_tasks))
+        machine_healthcheck_subflow.add(machine_healthcheck_task)
+
+        # define reallocation subflow:
+        machine_reallocation_subflow = linear_flow.Flow(
+            minion_manager_tasks.MINION_POOL_REALLOCATE_MACHINE_SUBFLOW_NAME_FORMAT % (
+                minion_pool.id, minion_machine_id))
+        machine_reallocation_subflow.add(
+            minion_manager_tasks.DeallocateMinionMachineTask(
+                minion_pool.id, minion_machine_id, minion_pool.platform,
+                inject=inject_for_tasks))
+        machine_reallocation_subflow.add(
+            minion_manager_tasks.AllocateMinionMachineTask(
+                minion_pool.id, minion_machine_id, minion_pool.platform,
+                allocate_to_action=allocate_to_action,
+                inject=inject_for_tasks))
+        machine_healthcheck_subflow.add(
+            machine_reallocation_subflow,
+            # NOTE: this is required to not have taskflow attempt (and fail)
+            # to automatically link the above Healthcheck task to the
+            # new subflow based on inputs/outputs alone:
+            resolve_existing=False)
+
+        # link reallocation subflow to healthcheck task:
+        machine_healthcheck_subflow.link(
+            machine_healthcheck_task, machine_reallocation_subflow,
+            # NOTE: this is required to prevent any parent flows from skipping:
+            decider_depth=taskflow_deciders.Depth.FLOW,
+            decider=minion_manager_tasks.MinionMachineHealtchcheckDecider(
+                minion_pool.id, minion_machine_id,
+                on_successful_healthcheck=False))
+
+        return machine_healthcheck_subflow
+
+    def _get_minion_pool_refresh_flow(
             self, ctxt, minion_pool, requery=True):
 
         if requery:
@@ -918,54 +1064,79 @@ class MinionManagerServerEndpoint(object):
                 ctxt, minion_pool.id, include_machines=True,
                 include_progress_updates=False, include_events=False)
 
-        pool_healthcheck_flow = unordered_flow.Flow(
-            minion_manager_tasks.MINION_POOL_HEALTHCHECK_FLOW_NAME_FORMAT % (
+        pool_refresh_flow = unordered_flow.Flow(
+            minion_manager_tasks.MINION_POOL_REFRESH_FLOW_NAME_FORMAT % (
                 minion_pool.id))
+        max_minions_to_deallocate = (
+            len(minion_pool.minion_machines) - minion_pool.minimum_minions)
+        now = timeutils.utcnow()
+        machines_to_deallocate = []
+        machines_to_healthcheck = []
+        skipped_machines = {}
 
         for machine in minion_pool.minion_machines:
-            # define healthcheck subflow for each machine:
-            machine_healthcheck_subflow = graph_flow.Flow(
-                minion_manager_tasks.MINION_POOL_HEALTHCHECK_MACHINE_SUBFLOW_NAME_FORMAT % (
-                    minion_pool.id, machine.id))
-
-            # add healtcheck task to healthcheck subflow:
-            machine_healthcheck_task = (
-                minion_manager_tasks.HealthcheckMinionMachineTask(
-                    minion_pool.id, machine.id, minion_pool.platform))
-            machine_healthcheck_subflow.add(machine_healthcheck_task)
-
-            # define reallocation subflow:
-            machine_reallocation_subflow = linear_flow.Flow(
-                minion_manager_tasks.MINION_POOL_REALLOCATE_MACHINE_SUBFLOW_NAME_FORMAT % (
-                    minion_pool.id, machine.id))
-            machine_reallocation_subflow.add(
-                minion_manager_tasks.DeallocateMinionMachineTask(
-                    minion_pool.id, machine.id, minion_pool.platform))
-            machine_reallocation_subflow.add(
-                minion_manager_tasks.AllocateMinionMachineTask(
-                    minion_pool.id, machine.id, minion_pool.platform))
-            machine_healthcheck_subflow.add(machine_reallocation_subflow)
-
-            # link reallocation subflow to healthcheck task:
-            healthcheck_cls = minion_manager_tasks.HealthcheckMinionMachineTask
-            machine_healthcheck_subflow.link(
-                machine_healthcheck_task, machine_reallocation_subflow,
-                decider=healthcheck_cls.make_minion_machine_healtcheck_decider(
-                    minion_pool.id, machine.id,
-                    on_successful_healthcheck=False))
+            if machine.status != constants.MINION_MACHINE_STATUS_AVAILABLE:
+                skipped_machines[machine.id] = machine.status
+                continue
 
-            # add the healthcheck subflow to the main flow:
-            pool_healthcheck_flow.add(machine_healthcheck_subflow)
+            minion_expired = True
+            if machine.last_used_at:
+                expiry_time = (
+                    machine.last_used_at + datetime.timedelta(
+                        seconds=minion_pool.minion_max_idle_time))
+                minion_expired = expiry_time <= now
+
+            # deallocate the machine if it is expired:
+            if max_minions_to_deallocate > 0 and minion_expired:
+                pool_refresh_flow.add(
+                    minion_manager_tasks.DeallocateMinionMachineTask(
+                        minion_pool.id, machine.id, minion_pool.platform))
+                max_minions_to_deallocate = max_minions_to_deallocate - 1
+                machines_to_deallocate.append(machine.id)
+            # else, perform a healthcheck on the machine:
+            else:
+                pool_refresh_flow.add(
+                    self._get_healtchcheck_flow_for_minion_machine(
+                        minion_pool, machine.id, allocate_to_action=None,
+                        machine_status_on_success=(
+                            constants.MINION_MACHINE_STATUS_AVAILABLE)))
+                machines_to_healthcheck.append(machine.id)
+
+        # update DB entried for all machines:
+        if machines_to_deallocate:
+            LOG.debug(
+                "The following minion machines will be deallocated as part "
+                "of the refreshing of minion pool '%s': %s",
+                minion_pool.id, machines_to_deallocate)
+            for machine in machines_to_deallocate:
+                db_api.set_minion_machine_status(
+                    ctxt, machine,
+                    constants.MINION_MACHINE_STATUS_DEALLOCATING)
+        if machines_to_healthcheck:
+            LOG.debug(
+                "The following minion machines will be healthchecked as part "
+                "of the refreshing of minion pool '%s': %s",
+                minion_pool.id, machines_to_healthcheck)
+            for machine in machines_to_healthcheck:
+                db_api.set_minion_machine_status(
+                    ctxt, machine,
+                    constants.MINION_MACHINE_STATUS_HEALTHCHECKING)
+        if skipped_machines:
+            LOG.debug(
+                "The following minion machines were skipped during the "
+                "refreshing of minion pool '%s' as they were in other "
+                "statuses than the serviceable ones: %s",
+                minion_pool.id, skipped_machines)
 
-        return pool_healthcheck_flow
+        return pool_refresh_flow
 
     @minion_manager_utils.minion_pool_synchronized_op
-    def healthcheck_minion_pool(self, ctxt, minion_pool_id):
+    def refresh_minion_pool(self, ctxt, minion_pool_id):
         LOG.info("Attempting to healthcheck Minion Pool '%s'.", minion_pool_id)
         minion_pool = self._get_minion_pool(
             ctxt, minion_pool_id, include_events=False, include_machines=True,
             include_progress_updates=False)
-        endpoint_dict = self._conductor_client.get_endpoint(
+        endpoint_dict = self._rpc_conductor_client.get_endpoint(
             ctxt, minion_pool.endpoint_id)
         acceptable_allocation_statuses = [
             constants.MINION_POOL_STATUS_ALLOCATED]
@@ -977,25 +1148,24 @@ class MinionManagerServerEndpoint(object):
                     minion_pool_id, current_status,
                     acceptable_allocation_statuses))
 
-        healthcheck_flow = self._get_minion_pool_healthcheck_flow(
+        healthcheck_flow = self._get_minion_pool_refresh_flow(
             ctxt, minion_pool, requery=False)
+        if not healthcheck_flow:
+            msg = (
+                "There are no minion machine healthchecks to be performed at "
+                "this time." % minion_pool_id)
+            LOG.debug(msg)
+            db_api.add_minion_pool_event(
+                ctxt, minion_pool.id, constants.TASK_EVENT_INFO, msg)
+            return self._get_minion_pool(ctxt, minion_pool.id)
+
         initial_store = self._get_pool_initial_taskflow_store_base(
             ctxt, minion_pool, endpoint_dict)
-
-        try:
-            db_api.set_minion_pool_status(
-                ctxt, minion_pool_id,
-                constants.MINION_POOL_STATUS_POOL_MAINTENANCE)
-            self._taskflow_runner.run_flow_in_background(
-                healthcheck_flow, store=initial_store)
-        except:
-            db_api.set_minion_pool_status(
-                ctxt, minion_pool_id, current_status)
-            raise
+        self._taskflow_runner.run_flow_in_background(
+            healthcheck_flow, store=initial_store)
 
         return self._get_minion_pool(ctxt, minion_pool.id)
 
-
     def _get_minion_pool_allocation_flow(self, minion_pool):
         """ Returns a taskflow.Flow object pertaining to all the tasks
         required for allocating a minion pool (validation, shared resource
@@ -1071,7 +1241,7 @@ class MinionManagerServerEndpoint(object):
             minion_max_idle_time, minion_retention_strategy, notes=None,
             skip_allocation=False):
 
-        endpoint_dict = self._conductor_client.get_endpoint(
+        endpoint_dict = self._rpc_conductor_client.get_endpoint(
             ctxt, endpoint_id)
         minion_pool = models.MinionPool()
         minion_pool.id = str(uuid.uuid4())
@@ -1153,7 +1323,7 @@ class MinionManagerServerEndpoint(object):
         minion_pool = self._get_minion_pool(
             ctxt, minion_pool_id, include_events=False, include_machines=False,
             include_progress_updates=False)
-        endpoint_dict = self._conductor_client.get_endpoint(
+        endpoint_dict = self._rpc_conductor_client.get_endpoint(
             ctxt, minion_pool.endpoint_id)
         acceptable_allocation_statuses = [
             constants.MINION_POOL_STATUS_DEALLOCATED]
@@ -1267,7 +1437,7 @@ class MinionManagerServerEndpoint(object):
                     minion_pool_id)
         self._check_pool_machines_in_use(
             ctxt, minion_pool, raise_if_in_use=not force)
-        endpoint_dict = self._conductor_client.get_endpoint(
+        endpoint_dict = self._rpc_conductor_client.get_endpoint(
             ctxt, minion_pool.endpoint_id)
 
         deallocation_flow = self._get_minion_pool_deallocation_flow(
@@ -1987,7 +2157,7 @@ class MinionManagerServerEndpoint(object):
     #             allocated_osmorphing_machine_ids))
     #         db_api.set_minion_machines_allocation_statuses(
     #             ctxt, all_machine_ids, action['id'],
-    #             constants.MINION_MACHINE_STATUS_ALLOCATED,
+    #             constants.MINION_MACHINE_STATUS_IN_USE,
     #             refresh_allocation_time=True)
 
     #     # filter out redundancies:

+ 85 - 50
coriolis/minion_manager/rpc/tasks.py

@@ -31,7 +31,7 @@ MINION_POOL_REPLICA_ALLOCATION_SUBFLOW_NAME_FORMAT = (
     "replica-%s-minions-machines-allocation")
 MINION_POOL_ALLOCATION_FLOW_NAME_FORMAT = "pool-%s-allocation"
 MINION_POOL_DEALLOCATION_FLOW_NAME_FORMAT = "pool-%s-deallocation"
-MINION_POOL_HEALTHCHECK_FLOW_NAME_FORMAT = "pool-%s-healthcheck"
+MINION_POOL_REFRESH_FLOW_NAME_FORMAT = "pool-%s-refresh"
 MINION_POOL_VALIDATION_TASK_NAME_FORMAT = "pool-%s-validation"
 MINION_POOL_UPDATE_STATUS_TASK_NAME_FORMAT = "pool-%s-update-status-%s"
 MINION_POOL_HEALTHCHECK_MACHINE_TASK_NAME_FORMAT = (
@@ -97,6 +97,25 @@ class MinionManagerTaskEventMixin(object):
                     minion_machine_id))
         return machine
 
+    def _set_minion_pool_status(self, ctxt, minion_pool_id, new_status):
+        with minion_manager_utils.get_minion_pool_lock(
+                minion_pool_id, external=True):
+            db_api.set_minion_pool_status(ctxt, minion_pool_id, new_status)
+
+    def _update_minion_machine(
+            self, ctxt, minion_pool_id, minion_machine_id, updated_values):
+        with minion_manager_utils.get_minion_pool_lock(
+                minion_pool_id, external=True):
+            db_api.update_minion_machine(
+                ctxt, minion_machine_id, updated_values)
+
+    def _set_minion_machine_status(
+            self, ctxt, minion_pool_id, minion_machine_id, new_status):
+        with minion_manager_utils.get_minion_pool_lock(
+                minion_pool_id, external=True):
+            db_api.set_minion_machine_status(
+                ctxt, minion_machine_id, new_status)
+
 
 class _BaseReportMinionAllocationFailureForActionTask(
         coriolis_taskflow_base.BaseCoriolisTaskflowTask,
@@ -210,13 +229,13 @@ class _BaseConfirmMinionAllocationForActionTask(
         def _check_minion_properties(
                 minion_machine, instance, minion_purpose="unknown"):
             if minion_machine.status != (
-                    constants.MINION_MACHINE_STATUS_ALLOCATED):
+                    constants.MINION_MACHINE_STATUS_IN_USE):
                 raise exception.InvalidMinionMachineState(
                     "Minion machine with ID '%s' is in '%s' status instead "
                     "of the expected '%s' for it to be used as a '%s' "
                     "minion for instance '%s' of transfer action '%s'." % (
                         minion_machine.id, minion_machine.status,
-                        constants.MINION_MACHINE_STATUS_ALLOCATED,
+                        constants.MINION_MACHINE_STATUS_IN_USE,
                         minion_purpose, instance, self._action_id))
 
             if minion_machine.allocated_action != self._action_id:
@@ -375,7 +394,7 @@ class UpdateMinionPoolStatusTask(
                 "to '%s'." % (
                     self._task_name, self._minion_pool_id,
                     self._previous_status, self._target_status))
-            db_api.set_minion_pool_status(
+            self._set_minion_pool_status(
                 context, self._minion_pool_id, self._target_status)
             self._add_minion_pool_event(
                 context,
@@ -422,7 +441,7 @@ class UpdateMinionPoolStatusTask(
                 "'%s'" % (
                     self._task_name, self._minion_pool_id, minion_pool.status,
                     previous_status))
-            db_api.set_minion_pool_status(
+            self._set_minion_pool_status(
                 context, self._minion_pool_id, previous_status)
             self._add_minion_pool_event(
                 context,
@@ -702,17 +721,15 @@ class AllocateMinionMachineTask(BaseMinionManangerTask):
                 "[Task '%s'] Found existing entry in DB for minion machine "
                 "'%s'. Reusing that for deployment task.",
                 self._task_name, self._minion_machine_id)
-            with minion_manager_utils.get_minion_pool_lock(
-                    self._minion_pool_id, external=True):
-                db_api.update_minion_machine(
-                    context, self._minion_machine_id,
-                    {"status": constants.MINION_MACHINE_STATUS_DEPLOYING})
+            self._set_minion_machine_status(
+                context, self._minion_pool_id, self._minion_machine_id,
+                constants.MINION_MACHINE_STATUS_ALLOCATING)
         else:
             minion_machine = models.MinionMachine()
             minion_machine.id = self._minion_machine_id
             minion_machine.pool_id = self._minion_pool_id
             minion_machine.status = (
-                constants.MINION_MACHINE_STATUS_DEPLOYING)
+                constants.MINION_MACHINE_STATUS_ALLOCATING)
             log_msg = (
                 "[Task '%s'] Adding new minion machine with ID '%s' "
                 "to the DB" % (self._task_name, self._minion_machine_id))
@@ -743,9 +760,9 @@ class AllocateMinionMachineTask(BaseMinionManangerTask):
             res = super(AllocateMinionMachineTask, self).execute(
                 context, origin, destination, execution_info)
         except:
-            db_api.update_minion_machine(
-                context, self._minion_machine_id, {
-                    "status": constants.MINION_MACHINE_STATUS_ERROR_DEPLOYING})
+            self._set_minion_machine_status(
+                context, self._minion_pool_id, self._minion_machine_id,
+                constants.MINION_MACHINE_STATUS_ERROR_DEPLOYING)
             raise
 
         self._add_minion_pool_event(
@@ -754,7 +771,7 @@ class AllocateMinionMachineTask(BaseMinionManangerTask):
             "ID '%s'" % (self._minion_machine_id))
 
         updated_values = {
-            "allocated_at": timeutils.utcnow(),
+            "last_used_at": timeutils.utcnow(),
             "status": constants.MINION_MACHINE_STATUS_AVAILABLE,
             "connection_info": res['minion_connection_info'],
             "provider_properties": res['minion_provider_properties'],
@@ -763,9 +780,10 @@ class AllocateMinionMachineTask(BaseMinionManangerTask):
         if self._allocate_to_action:
             updated_values["allocated_action"] = self._allocate_to_action
             updated_values["status"] = (
-                constants.MINION_MACHINE_STATUS_ALLOCATED)
-        db_api.update_minion_machine(
-            context, self._minion_machine_id, updated_values)
+                constants.MINION_MACHINE_STATUS_IN_USE)
+        self._update_minion_machine(
+            context, self._minion_pool_id, self._minion_machine_id,
+            updated_values)
 
         return task_info
 
@@ -933,7 +951,9 @@ class DeallocateMinionMachineTask(BaseMinionManangerTask):
         LOG.debug(
             "[Task '%s'] Deleting minion machine with ID '%s' from the DB",
             self._task_name, self._minion_machine_id)
-        db_api.delete_minion_machine(context, self._minion_machine_id)
+        with minion_manager_utils.get_minion_pool_lock(
+                self._minion_pool_id, external=True):
+            db_api.delete_minion_machine(context, self._minion_machine_id)
 
         self._add_minion_pool_event(
             context,
@@ -948,8 +968,11 @@ class HealthcheckMinionMachineTask(BaseMinionManangerTask):
 
     def __init__(
             self, minion_pool_id, minion_machine_id, minion_pool_type,
-            fail_on_error=False, **kwargs):
+            fail_on_error=False,
+            machine_status_on_success=constants.MINION_MACHINE_STATUS_AVAILABLE,
+            **kwargs):
         self._fail_on_error = fail_on_error
+        self._machine_status_on_success = machine_status_on_success
         resource_healthcheck_task = (
             constants.TASK_TYPE_HEALTHCHECK_SOURCE_MINION)
         if minion_pool_type != constants.PROVIDER_PLATFORM_SOURCE:
@@ -989,17 +1012,23 @@ class HealthcheckMinionMachineTask(BaseMinionManangerTask):
                 context,
                 "Successfully healtchecked minion machine with internal "
                 "pool ID '%s'" % self._minion_machine_id)
+            self._set_minion_machine_status(
+                context, self._minion_pool_id, self._minion_machine_id,
+                self._machine_status_on_success)
         except Exception as ex:
             self._add_minion_pool_event(
                 context,
-                "Healtcheck for machine '%s' has failed." % (
-                    self._minion_machine_id),
+                "Healtcheck for machine with internal pool ID '%s' has "
+                "failed." % (self._minion_machine_id),
                 level=constants.TASK_EVENT_WARNING)
             LOG.debug(
                 "[Task '%s'] Healtcheck failed for machine '%s' of pool '%s'. "
                 "Full trace was:\n%s", self._task_name,
                 self._minion_machine_id, self._minion_pool_id,
                 utils.get_exception_details())
+            self._set_minion_machine_status(
+                context, self._minion_pool_id, self._minion_machine_id,
+                constants.MINION_MACHINE_STATUS_ERROR)
             if not self._fail_on_error:
                 res = {
                     "healthy": False,
@@ -1018,34 +1047,40 @@ class HealthcheckMinionMachineTask(BaseMinionManangerTask):
         return MINION_POOL_HEALTHCHECK_MACHINE_TASK_NAME_FORMAT % (
             minion_pool_id, minion_machine_id)
 
-    @classmethod
-    def make_minion_machine_healtcheck_decider(
-            cls, minion_pool_id, minion_machine_id,
-            on_successful_healthcheck=True):
-        def _healthcheck_decider(history):
-            healthcheck_task_name = (
-                cls.get_healthcheck_task_name(
-                    minion_pool_id, minion_machine_id))
 
-            if not history and healthcheck_task_name not in history:
-                LOG.warn(
-                    "Could not find healthceck result for minion machine '%s' "
-                    "of pool '%s' (task name '%s'). NOT grennlighting futher "
-                    "tasks.", minion_machine_id, minion_pool_id,
-                    healthcheck_task_name)
+class MinionMachineHealtchcheckDecider(object):
+    """ A callable to green/redlight further execution based on the result. """
 
-            healtcheck_result = history[healthcheck_task_name]
-            if healtcheck_result['healthy']:
-                LOG.debug(
-                    "Healtcheck task '%s' confirmed worker health. Decider "
-                    "returning %s", healthcheck_task_name,
-                    on_successful_healthcheck)
-                return on_successful_healthcheck
-            else:
-                LOG.debug(
-                    "Healtcheck task '%s' denied worker health. Decider "
-                    "returning %s", healthcheck_task_name,
-                    not on_successful_healthcheck)
-                return not on_successful_healthcheck
+    def __init__(
+            self, minion_pool_id, minion_machine_id,
+            on_successful_healthcheck=True):
+        self._minion_pool_id = minion_pool_id
+        self._minion_machine_id = minion_machine_id
+        self._on_success = on_successful_healthcheck
+
+    def __call__(self, history):
+        healthcheck_task_name = (
+            HealthcheckMinionMachineTask.get_healthcheck_task_name(
+                self._minion_pool_id, self._minion_machine_id))
 
-        return _healthcheck_decider
+        if not history and healthcheck_task_name not in history:
+            LOG.warn(
+                "Could not find healthceck result for minion machine '%s' "
+                "of pool '%s' (task name '%s'). NOT grennlighting futher "
+                "tasks.", self._minion_machine_id, self._minion_pool_id,
+                healthcheck_task_name)
+            return False
+
+        healtcheck_result = history[healthcheck_task_name]
+        if healtcheck_result.get('healthy'):
+            LOG.debug(
+                "Healtcheck task '%s' confirmed worker health. Decider "
+                "returning %s", healthcheck_task_name,
+                self._on_success)
+            return self._on_success
+        else:
+            LOG.debug(
+                "Healtcheck task '%s' denied worker health. Decider "
+                "returning %s", healthcheck_task_name,
+                not self._on_success)
+            return not self._on_success

+ 2 - 2
coriolis/minion_pools/api.py

@@ -37,8 +37,8 @@ class API(object):
         return self._rpc_client.allocate_minion_pool(
             ctxt, minion_pool_id)
 
-    def healthcheck_minion_pool(self, ctxt, minion_pool_id):
-        return self._rpc_client.healthcheck_minion_pool(
+    def refresh_minion_pool(self, ctxt, minion_pool_id):
+        return self._rpc_client.refresh_minion_pool(
             ctxt, minion_pool_id)
 
     def deallocate_minion_pool(self, ctxt, minion_pool_id, force=False):