Просмотр исходного кода

Fix automatic pool refreshing.

Nashwan Azhari 5 лет назад
Родитель
Сommit
e1b1b3aaa0

+ 2 - 2
coriolis/api/v1/minion_pool_actions.py

@@ -31,11 +31,11 @@ class MinionPoolActionsController(api_wsgi.Controller):
             raise exc.HTTPNotFound(explanation=ex.msg)
 
     @api_wsgi.action('refresh')
-    def _healthcheck_pool(self, req, id, body):
+    def _refresh_pool(self, req, id, body):
         context = req.environ['coriolis.context']
         context.can(
             minion_pool_policies.get_minion_pools_policy_label(
-                "healthcheck"))
+                "refresh"))
         try:
             return minion_pool_view.single(
                 req, self.minion_pool_api.refresh_minion_pool(

+ 17 - 3
coriolis/cron/cron.py

@@ -155,17 +155,31 @@ class Cron(object):
         if not isinstance(job, CronJob):
             raise ValueError("Invalid job class")
         name = job.name
+        LOG.debug("Registering cron job with name '%s'", name)
         with self._semaphore:
             self._jobs[name] = job
 
     def unregister(self, name):
         job = self._jobs.get(name)
         if job:
+            LOG.debug("Unregistering cron job with name '%s'", name)
             with self._semaphore:
                 del self._jobs[name]
 
+    def unregister_jobs_with_prefix(self, prefix):
+        jobs = [
+            job for job in self._jobs
+            if job.startswith(prefix)]
+        if jobs:
+            LOG.debug(
+                "Unregistering the following cron jobs based on "
+                "the requested prefix ('%s'): %s", prefix, jobs)
+            with self._semaphore:
+                for job in jobs:
+                    del self._jobs[job]
+
     def _check_jobs(self):
-        LOG.debug("Checking jobs")
+        LOG.debug("Checking cron jobs")
         jobs = self._jobs.copy()
         job_nr = len(jobs)
         spawned = 0
@@ -206,8 +220,8 @@ class Cron(object):
                     "job_err": error})
             if result:
                 LOG.info("Job %(desc)s returned: %(ret)r" % {
-                    "job_desc": desc,
-                    "job_ret": result})
+                    "desc": desc,
+                    "ret": result})
 
     def _janitor(self):
         # remove expired jobs from memory. The check for expired

+ 1 - 2
coriolis/db/sqlalchemy/migrate_repo/versions/016_adds_minion_vm_pools.py

@@ -36,8 +36,7 @@ def upgrade(migrate_engine):
             sqlalchemy.Column(
                 "project_id", sqlalchemy.String(255), nullable=False),
             sqlalchemy.Column(
-                "maintenance_trust_id",
-                sqlalchemy.String(255), nullable=False),
+                "maintenance_trust_id", sqlalchemy.String(255), nullable=True),
             sqlalchemy.Column('created_at', sqlalchemy.DateTime),
             sqlalchemy.Column('updated_at', sqlalchemy.DateTime),
             sqlalchemy.Column('deleted_at', sqlalchemy.DateTime),

+ 1 - 1
coriolis/db/sqlalchemy/models.py

@@ -535,7 +535,7 @@ class MinionPool(
     user_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=False)
     project_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=False)
     maintenance_trust_id = sqlalchemy.Column(
-        sqlalchemy.String(255), nullable=False)
+        sqlalchemy.String(255), nullable=True)
 
     name = sqlalchemy.Column(
         sqlalchemy.String(255),

+ 102 - 25
coriolis/minion_manager/rpc/server.py

@@ -2,7 +2,6 @@
 # All Rights Reserved.
 
 import datetime
-import itertools
 import math
 import uuid
 
@@ -44,6 +43,7 @@ MINION_MANAGER_OPTS = [
 CONF = cfg.CONF
 CONF.register_opts(MINION_MANAGER_OPTS, 'minion_manager')
 
+MINION_POOL_REFRESH_JOB_PREFIX_FORMAT = "pool-%s-refresh"
 MINION_POOL_REFRESH_CRON_JOB_NAME_FORMAT = "pool-%s-refresh-minute-%d"
 MINION_POOL_REFRESH_CRON_JOB_DESCRIPTION_FORMAT = (
     "Regularly scheduled refresh job for minion pool '%s' on minute %d.")
@@ -62,15 +62,25 @@ def _trigger_pool_refresh(ctxt, minion_manager_client, minion_pool_id):
 class MinionManagerServerEndpoint(object):
 
     def __init__(self):
-        self._cron = cron.Cron()
         self._admin_ctxt = context.get_admin_context()
-        # self._init_cron()
+        try:
+            self._cron = cron.Cron()
+            self._init_pools_refresh_cron_jobs()
+            self._cron.start()
+        except Exception as ex:
+            LOG.warn(
+                "A fatal exception occurred while attempting to set up cron "
+                "jobs for automatic pool refreshing. Automatic refreshing will"
+                " not be perfomed until the issue is fixed and the service is "
+                "restarted. Exception details were: %s",
+                utils.get_exception_details())
 
-    def _init_cron(self):
-        now = timeutils.utcnow()
+    def _init_pools_refresh_cron_jobs(self):
         minion_pools = db_api.get_minion_pools(
             self._admin_ctxt, include_machines=False,
-            include_progress_updates=False, include_events=False)
+            include_progress_updates=False, include_events=False,
+            to_dict=False)
+
         for minion_pool in minion_pools:
             active_pool_statuses = [constants.MINION_POOL_STATUS_ALLOCATED]
             if minion_pool.status not in active_pool_statuses:
@@ -79,13 +89,28 @@ class MinionManagerServerEndpoint(object):
                     "as it is in an inactive status '%s'.",
                     minion_pool.id, minion_pool.status)
                 continue
+
+            if not minion_pool.maintenance_trust_id:
+                LOG.warn(
+                    "Minion Pool with ID '%s' had no maintenance trust "
+                    "ID associated with it. Cannot set up automatic "
+                    "refreshing during startup. Skipping.",
+                    minion_pool.id)
+                continue
+
             LOG.debug(
                 "Adding refresh schedule for minion pool '%s' as part of "
                 "server startup.", minion_pool.id)
-            self._register_refresh_jobs_for_minion_pool(minion_pool, date=now)
+            try:
+                self._register_refresh_jobs_for_minion_pool(minion_pool)
+            except Exception as ex:
+                LOG.warn(
+                    "An Exception occurred while setting up automatic "
+                    "refreshing for minion pool with ID '%s'. Error was: %s",
+                    minion_pool.id, utils.get_exception_details())
 
     def _register_refresh_jobs_for_minion_pool(
-            self, minion_pool, date=None, period_minutes=None):
+            self, minion_pool, period_minutes=None):
         if not period_minutes:
             period_minutes = CONF.minion_manager.minion_pool_default_refresh_period_minutes
         if period_minutes <= 0:
@@ -98,9 +123,8 @@ class MinionManagerServerEndpoint(object):
                 "Selected pool refresh period_minutes is greater than 60, defaulting "
                 "to 10. Original value was: %s", period_minutes)
             period_minutes = 10
-        if not date:
-            date = timeutils.utcnow()
-        admin_ctxt = context.get_admin_context()
+        admin_ctxt = context.get_admin_context(
+            minion_pool.maintenance_trust_id)
         description = (
             "Scheduled refresh job for minion pool '%s'" % minion_pool.id)
 
@@ -119,6 +143,22 @@ class MinionManagerServerEndpoint(object):
                     None, _trigger_pool_refresh, admin_ctxt,
                     self._rpc_minion_manager_client, minion_pool.id))
 
+    def _unregister_refresh_jobs_for_minion_pool(
+            self, minion_pool, raise_on_error=True):
+        job_prefix = MINION_POOL_REFRESH_JOB_PREFIX_FORMAT % (
+            minion_pool.id)
+        try:
+            self._cron.unregister_jobs_with_prefix(job_prefix)
+        except Exception as ex:
+            if not raise_on_error:
+                LOG.warn(
+                    "Exception occurred while unregistering minion pool "
+                    "refresh  cron jobs for pool with ID '%s'. "
+                    "Exception was: %s",
+                    minion_pool.id, utils.get_exception_details())
+            else:
+                raise
+
     @property
     def _taskflow_runner(self):
         return taskflow_runner.TaskFlowRunner(
@@ -440,7 +480,7 @@ class MinionManagerServerEndpoint(object):
                 include_osmorphing_minions=False)
         except Exception as ex:
             LOG.warn(
-                "Error occured while allocating minion machines for "
+                "Error occurred while allocating minion machines for "
                 "Replica with ID '%s'. Removing all allocations. "
                 "Error was: %s" % (
                     replica['id'], utils.get_exception_details()))
@@ -464,7 +504,7 @@ class MinionManagerServerEndpoint(object):
                 include_osmorphing_minions=include_osmorphing_minions)
         except Exception as ex:
             LOG.warn(
-                "Error occured while allocating minion machines for "
+                "Error occurred while allocating minion machines for "
                 "Migration with ID '%s'. Removing all allocations. "
                 "Error was: %s" % (
                     migration['id'], utils.get_exception_details()))
@@ -612,8 +652,8 @@ class MinionManagerServerEndpoint(object):
                 new_machine_db_entries_added.append(new_machine.id)
         except Exception as ex:
             LOG.warn(
-                "Exception occured while adding new minion machine entries to "
-                "the DB for pool '%s' for use with action '%s'. Clearing "
+                "Exception occurred while adding new minion machine entries to"
+                " the DB for pool '%s' for use with action '%s'. Clearing "
                 "any DB entries added so far (%s). Error was: %s",
                 minion_pool.id, action_id,
                 [m.id for m in new_machine_db_entries_added],
@@ -643,7 +683,7 @@ class MinionManagerServerEndpoint(object):
                     db_api.delete_minion_machine(ctxt, new_machine.id)
                 except Exception as ex:
                     LOG.warn(
-                        "Error occured while removing minion machine entry "
+                        "Error occurred while removing minion machine entry "
                         "'%s' from the DB. This may leave the pool in an "
                         "inconsistent state. Error trace was: %s" % (
                             new_machine.id, utils.get_exception_details()))
@@ -887,7 +927,7 @@ class MinionManagerServerEndpoint(object):
                 for minion_pool_id in pools_used:
                     self._add_minion_pool_event(
                         ctxt, minion_pool.id, constants.TASK_EVENT_ERROR,
-                        "A fatal exception occured while attempting to start "
+                        "A fatal exception occurred while attempting to start "
                         "the task flow for allocating machines for %s '%s'. "
                         "Forced deallocation and reallocation may be required."
                         " Please review the minion manager logs for additional"
@@ -1103,18 +1143,43 @@ class MinionManagerServerEndpoint(object):
                 ctxt, minion_pool.id, include_machines=True,
                 include_progress_updates=False, include_events=False)
 
+        # determine how many machines could be feasibily downscaled:
+        machine_statuses = {
+            machine.id: machine.status
+            for machine in minion_pool.minion_machines}
+        ignorable_machine_statuses = [
+            constants.MINION_MACHINE_STATUS_DEALLOCATING,
+            constants.MINION_MACHINE_STATUS_ERROR,
+            constants.MINION_MACHINE_STATUS_ERROR_DEPLOYING]
+        max_minions_to_deallocate = (
+            len([
+                mid for mid in machine_statuses
+                if machine_statuses[mid] not in ignorable_machine_statuses]) - (
+                    minion_pool.minimum_minions))
+        LOG.debug(
+            "Determined minion pool '%s' machine deallocation number to be %d "
+            "based on machines stauses: %s",
+            minion_pool.id, max_minions_to_deallocate, machine_statuses)
+
+        # define refresh flow and process all relevant machines:
         pool_refresh_flow = unordered_flow.Flow(
             minion_manager_tasks.MINION_POOL_REFRESH_FLOW_NAME_FORMAT % (
                 minion_pool.id))
-        max_minions_to_deallocate = (
-            len(minion_pool.minion_machines) - minion_pool.minimum_minions)
         now = timeutils.utcnow()
         machines_to_deallocate = []
         machines_to_healthcheck = []
         skipped_machines = {}
+        healthcheckable_machine_statuses = [
+            constants.MINION_MACHINE_STATUS_AVAILABLE,
+            # NOTE(aznashwan): this should help account for 'transient' issues
+            # where a minion which may have been marked as error'd at some
+            # point may be back online. Event if it isn't, the
+            # sublow redeploy it after the healthcheck fails:
+            constants.MINION_MACHINE_STATUS_ERROR,
+            constants.MINION_MACHINE_STATUS_ERROR_DEPLOYING]
 
         for machine in minion_pool.minion_machines:
-            if machine.status != constants.MINION_MACHINE_STATUS_AVAILABLE:
+            if machine.status not in healthcheckable_machine_statuses:
                 skipped_machines[machine.id] = machine.status
                 continue
 
@@ -1206,8 +1271,7 @@ class MinionManagerServerEndpoint(object):
         if not healthcheck_flow:
             msg = (
                 "There are no minion machine healthchecks to be performed at "
-                "this time." % minion_pool_id)
-            LOG.debug(msg)
+                "this time.")
             db_api.add_minion_pool_event(
                 ctxt, minion_pool.id, constants.TASK_EVENT_INFO, msg)
             return self._get_minion_pool(ctxt, minion_pool.id)
@@ -1320,9 +1384,12 @@ class MinionManagerServerEndpoint(object):
             self._add_minion_pool_event(
                 ctxt, minion_pool.id, constants.TASK_EVENT_INFO,
                 "Successfully added minion pool to the DB")
+            self._register_refresh_jobs_for_minion_pool(
+                minion_pool)
         except Exception:
             if cleanup_trust:
                 keystone.delete_trust(ctxt)
+            raise
 
         if not skip_allocation:
             allocation_flow = self._get_minion_pool_allocation_flow(
@@ -1336,7 +1403,7 @@ class MinionManagerServerEndpoint(object):
             except Exception as ex:
                 self._add_minion_pool_event(
                     ctxt, minion_pool.id, constants.TASK_EVENT_ERROR,
-                    "A fatal exception occured while attempting to start the "
+                    "A fatal exception occurred while attempting to start the "
                     "task flow for allocating the minion pool. Forced "
                     "deallocation and reallocation may be required. Please "
                     "review the manager logs for additional details. "
@@ -1425,13 +1492,15 @@ class MinionManagerServerEndpoint(object):
                 constants.MINION_POOL_STATUS_POOL_MAINTENANCE)
             self._taskflow_runner.run_flow_in_background(
                 allocation_flow, store=initial_store)
+            self._register_refresh_jobs_for_minion_pool(
+                minion_pool)
             self._add_minion_pool_event(
                 ctxt, minion_pool.id, constants.TASK_EVENT_INFO,
                 "Begun minion pool allocation process")
         except Exception as ex:
             self._add_minion_pool_event(
                 ctxt, minion_pool.id, constants.TASK_EVENT_ERROR,
-                "A fatal exception occured while attempting to start the "
+                "A fatal exception occurred while attempting to start the "
                 "task flow for allocating the minion pool. Forced "
                 "deallocation and reallocation may be required. Please "
                 "review the manager logs for additional details. "
@@ -1540,13 +1609,15 @@ class MinionManagerServerEndpoint(object):
                 constants.MINION_POOL_STATUS_POOL_MAINTENANCE)
             self._taskflow_runner.run_flow_in_background(
                 deallocation_flow, store=initial_store)
+            self._unregister_refresh_jobs_for_minion_pool(
+                minion_pool, raise_on_error=False)
             self._add_minion_pool_event(
                 ctxt, minion_pool.id, constants.TASK_EVENT_INFO,
                 "Begun minion pool deallocation process")
         except Exception as ex:
             self._add_minion_pool_event(
                 ctxt, minion_pool.id, constants.TASK_EVENT_ERROR,
-                "A fatal exception occured while attempting to start the "
+                "A fatal exception occurred while attempting to start the "
                 "task flow for deallocating the minion pool. Forced "
                 "deallocation and reallocation may be required. Please "
                 "review the manager logs for additional details. "
@@ -1617,6 +1688,12 @@ class MinionManagerServerEndpoint(object):
                 "resources have been torn down before deleting the pool." % (
                     minion_pool_id, minion_pool.status,
                     acceptable_deletion_statuses))
+        self._unregister_refresh_jobs_for_minion_pool(
+            minion_pool, raise_on_error=False)
 
         LOG.info("Deleting minion pool with ID '%s'" % minion_pool_id)
         db_api.delete_minion_pool(ctxt, minion_pool_id)
+        if minion_pool.maintenance_trust_id:
+            maintenance_ctxt = context.get_admin_context(
+                minion_pool.maintenance_trust_id)
+            keystone.delete_trust(maintenance_ctxt)

+ 3 - 7
coriolis/minion_manager/rpc/tasks.py

@@ -578,16 +578,12 @@ class AllocateSharedPoolResourcesTask(BaseMinionManangerTask):
         res = super(AllocateSharedPoolResourcesTask, self).execute(
             context, origin, destination, task_info)
         pool_shared_resources = res['pool_shared_resources']
-        self._add_minion_pool_event(
-            context, "Successfully deployed shared pool resources")
 
         updated_values = {
             "shared_resources": pool_shared_resources}
 
-        db_api.add_minion_pool_event(
-            context, self._minion_pool_id, constants.TASK_EVENT_INFO,
-            "Successfully deployed shared pool resources" % (
-                pool_shared_resources))
+        self._add_minion_pool_event(
+            context, "Successfully deployed shared pool resources")
         with minion_manager_utils.get_minion_pool_lock(
                 self._minion_pool_id, external=True):
             db_api.update_minion_pool(
@@ -888,7 +884,7 @@ class AllocateMinionMachineTask(BaseMinionManangerTask):
                     context, origin, destination, cleanup_info, **kwargs)
             except Exception as ex:
                 log_msg = (
-                    "[Task '%s'] Exception occured while attempting to revert "
+                    "[Task '%s'] Exception occurred while attempting to revert "
                     "deployment of minion machine with ID '%s' for pool '%s'." % (
                         self._task_name, self._minion_machine_id,
                         self._minion_pool_id))

+ 2 - 2
coriolis/policies/minion_pools.py

@@ -84,9 +84,9 @@ MINION_POOLS_DEFAULT_RULES = [
         ]
     ),
     policy.DocumentedRuleDefault(
-        get_minion_pools_policy_label('healthcheck'),
+        get_minion_pools_policy_label('refresh'),
         MINION_POOLS_DEFAULT_RULE,
-        "Healthcheck Minion Pool",
+        "Refresh Minion Pool",
         [
             {
                 "path": "/minion_pools/{minion_pool_id}/actions",

+ 1 - 1
coriolis/taskflow/runner.py

@@ -78,7 +78,7 @@ class TaskFlowRunner(object):
             engine.run()
         except Exception as ex:
             LOG.warn(
-                "Fatal error occured while attempting to run flow '%s'. "
+                "Fatal error occurred while attempting to run flow '%s'. "
                 "Full trace was: %s", flow.name, utils.get_exception_details())
             raise
         LOG.info(