Просмотр исходного кода

Merge pull request #74 from aznashwan/debug-os-morphing

Add 'debug_os_morphing_errors' flag to conductor.
Nashwan Azhari 6 лет назад
Родитель
Сommit
2470d9af42
2 измененных файлов с 69 добавлено и 4 удалено
  1. 68 4
      coriolis/conductor/rpc/server.py
  2. 1 0
      coriolis/constants.py

+ 68 - 4
coriolis/conductor/rpc/server.py

@@ -6,6 +6,7 @@ import functools
 import uuid
 import uuid
 
 
 from oslo_concurrency import lockutils
 from oslo_concurrency import lockutils
+from oslo_config import cfg
 from oslo_log import log as logging
 from oslo_log import log as logging
 
 
 from coriolis import constants
 from coriolis import constants
@@ -20,11 +21,24 @@ from coriolis import schemas
 from coriolis import utils
 from coriolis import utils
 from coriolis.worker.rpc import client as rpc_worker_client
 from coriolis.worker.rpc import client as rpc_worker_client
 
 
+
 VERSION = "1.0"
 VERSION = "1.0"
 
 
 LOG = logging.getLogger(__name__)
 LOG = logging.getLogger(__name__)
 
 
 
 
+conductor_opts = [
+    cfg.BoolOpt("debug_os_morphing_errors",
+                default=False,
+                help="If set, any OSMorphing task which errors out will have "
+                     "all of its following tasks unscheduled so as to allow "
+                     "for live debugging of the OSMorphing setup.")
+]
+
+CONF = cfg.CONF
+CONF.register_opts(conductor_opts, 'conductor')
+
+
 def endpoint_synchronized(func):
 def endpoint_synchronized(func):
     @functools.wraps(func)
     @functools.wraps(func)
     def wrapper(self, ctxt, endpoint_id, *args, **kwargs):
     def wrapper(self, ctxt, endpoint_id, *args, **kwargs):
@@ -989,6 +1003,11 @@ class ConductorServerEndpoint(object):
         task = db_api.get_task(ctxt, task_id)
         task = db_api.get_task(ctxt, task_id)
         execution = db_api.get_tasks_execution(ctxt, task.execution_id)
         execution = db_api.get_tasks_execution(ctxt, task.execution_id)
 
 
+        task_error_states = [
+            constants.TASK_STATUS_ERROR,
+            constants.TASK_STATUS_CANCELED,
+            constants.TASK_STATUS_CANCELED_FOR_DEBUGGING]
+
         action_id = execution.action_id
         action_id = execution.action_id
         with lockutils.lock(action_id):
         with lockutils.lock(action_id):
             LOG.info("Setting instance %(instance)s "
             LOG.info("Setting instance %(instance)s "
@@ -1010,8 +1029,7 @@ class ConductorServerEndpoint(object):
                     # The execution is in error status if there's one or more
                     # The execution is in error status if there's one or more
                     # tasks in error or canceled status
                     # tasks in error or canceled status
                     if [t for t in execution.tasks
                     if [t for t in execution.tasks
-                            if t.status in [constants.TASK_STATUS_ERROR,
-                                            constants.TASK_STATUS_CANCELED]]:
+                            if t.status in task_error_states]:
                         execution_status = constants.EXECUTION_STATUS_ERROR
                         execution_status = constants.EXECUTION_STATUS_ERROR
                     else:
                     else:
                         execution_status = constants.EXECUTION_STATUS_COMPLETED
                         execution_status = constants.EXECUTION_STATUS_COMPLETED
@@ -1019,6 +1037,24 @@ class ConductorServerEndpoint(object):
                     self._set_tasks_execution_status(
                     self._set_tasks_execution_status(
                         ctxt, execution.id, execution_status)
                         ctxt, execution.id, execution_status)
 
 
+    def _cancel_execution_for_osmorphing_debugging(self, ctxt, execution):
+        # go through all scheduled tasks and cancel them:
+        for subtask in execution.tasks:
+            if subtask.task_type == constants.TASK_TYPE_OS_MORPHING:
+                continue
+
+            if subtask.status == constants.TASK_STATUS_RUNNING:
+                raise execution.CoriolisException(
+                    "Task %s is still running although it should not!",
+                    subtask.id)
+
+            if subtask.status in [
+                    constants.TASK_STATUS_PENDING,
+                    constants.TASK_STATUS_ON_ERROR_ONLY]:
+                db_api.set_task_status(
+                    ctxt, subtask.id,
+                    constants.TASK_STATUS_CANCELED_FOR_DEBUGGING)
+
     @task_synchronized
     @task_synchronized
     def set_task_error(self, ctxt, task_id, exception_details):
     def set_task_error(self, ctxt, task_id, exception_details):
         LOG.error("Task error: %(task_id)s - %(ex)s",
         LOG.error("Task error: %(task_id)s - %(ex)s",
@@ -1031,12 +1067,40 @@ class ConductorServerEndpoint(object):
         execution = db_api.get_tasks_execution(ctxt, task.execution_id)
         execution = db_api.get_tasks_execution(ctxt, task.execution_id)
 
 
         action_id = execution.action_id
         action_id = execution.action_id
+        action = db_api.get_action(ctxt, action_id)
         with lockutils.lock(action_id):
         with lockutils.lock(action_id):
-            self._cancel_tasks_execution(ctxt, execution)
+            if task.task_type == constants.TASK_TYPE_OS_MORPHING and (
+                    CONF.conductor.debug_os_morphing_errors):
+                LOG.debug("Attempting to cancel execution '%s' for OSMorphing "
+                          "debugging.", execution.id)
+                # NOTE: the OSMorphing task always runs by itself so no
+                # further tasks should be running, but we double-check here:
+                running = [
+                    t for t in execution.tasks
+                    if t.status == constants.TASK_STATUS_RUNNING
+                    and t.task_type != constants.TASK_TYPE_OS_MORPHING]
+                if not running:
+                    self._cancel_execution_for_osmorphing_debugging(
+                        ctxt, execution)
+                    LOG.warn(
+                        "All subtasks for Migration '%s' have been cancelled "
+                        "to allow for OSMorphing debugging. The connection "
+                        "info for the worker VM is: %s",
+                        action_id, action.info.get(task.instance, {}).get(
+                            'osmorphing_connection_info', {}))
+                    self._set_tasks_execution_status(
+                        ctxt, execution.id, constants.EXECUTION_STATUS_ERROR)
+                else:
+                    LOG.warn(
+                        "Some tasks are running in parallel with the "
+                        "OSMorphing task, a debug setup cannot be safely "
+                        "achieved. Proceeding with cleanup tasks as usual.")
+                    self._cancel_tasks_execution(ctxt, execution)
+            else:
+                self._cancel_tasks_execution(ctxt, execution)
 
 
         # NOTE: if this is a migration, make sure to delete
         # NOTE: if this is a migration, make sure to delete
         # its associated reservation.
         # its associated reservation.
-        action = db_api.get_action(ctxt, action_id)
         if action.type == "migration":
         if action.type == "migration":
             self._check_delete_reservation_for_transfer(action)
             self._check_delete_reservation_for_transfer(action)
 
 

+ 1 - 0
coriolis/constants.py

@@ -10,6 +10,7 @@ TASK_STATUS_RUNNING = "RUNNING"
 TASK_STATUS_COMPLETED = "COMPLETED"
 TASK_STATUS_COMPLETED = "COMPLETED"
 TASK_STATUS_ERROR = "ERROR"
 TASK_STATUS_ERROR = "ERROR"
 TASK_STATUS_CANCELED = "CANCELED"
 TASK_STATUS_CANCELED = "CANCELED"
+TASK_STATUS_CANCELED_FOR_DEBUGGING = "CANCELED_FOR_DEBUGGING"
 TASK_STATUS_ON_ERROR_ONLY = "EXECUTE_ON_ERROR_ONLY"
 TASK_STATUS_ON_ERROR_ONLY = "EXECUTE_ON_ERROR_ONLY"
 
 
 TASK_TYPE_EXPORT_INSTANCE = "EXPORT_INSTANCE"
 TASK_TYPE_EXPORT_INSTANCE = "EXPORT_INSTANCE"