|
|
@@ -6,6 +6,7 @@ import functools
|
|
|
import uuid
|
|
|
|
|
|
from oslo_concurrency import lockutils
|
|
|
+from oslo_config import cfg
|
|
|
from oslo_log import log as logging
|
|
|
|
|
|
from coriolis import constants
|
|
|
@@ -20,11 +21,24 @@ from coriolis import schemas
|
|
|
from coriolis import utils
|
|
|
from coriolis.worker.rpc import client as rpc_worker_client
|
|
|
|
|
|
+
|
|
|
VERSION = "1.0"
|
|
|
|
|
|
LOG = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
+conductor_opts = [
|
|
|
+ cfg.BoolOpt("debug_os_morphing_errors",
|
|
|
+ default=False,
|
|
|
+ help="If set, any OSMorphing task which errors out will have "
|
|
|
+ "all of its following tasks unscheduled so as to allow "
|
|
|
+ "for live debugging of the OSMorphing setup.")
|
|
|
+]
|
|
|
+
|
|
|
+CONF = cfg.CONF
|
|
|
+CONF.register_opts(conductor_opts, 'conductor')
|
|
|
+
|
|
|
+
|
|
|
def endpoint_synchronized(func):
|
|
|
@functools.wraps(func)
|
|
|
def wrapper(self, ctxt, endpoint_id, *args, **kwargs):
|
|
|
@@ -989,6 +1003,11 @@ class ConductorServerEndpoint(object):
|
|
|
task = db_api.get_task(ctxt, task_id)
|
|
|
execution = db_api.get_tasks_execution(ctxt, task.execution_id)
|
|
|
|
|
|
+ task_error_states = [
|
|
|
+ constants.TASK_STATUS_ERROR,
|
|
|
+ constants.TASK_STATUS_CANCELED,
|
|
|
+ constants.TASK_STATUS_CANCELED_FOR_DEBUGGING]
|
|
|
+
|
|
|
action_id = execution.action_id
|
|
|
with lockutils.lock(action_id):
|
|
|
LOG.info("Setting instance %(instance)s "
|
|
|
@@ -1010,8 +1029,7 @@ class ConductorServerEndpoint(object):
|
|
|
# The execution is in error status if there's one or more
|
|
|
# tasks in error or canceled status
|
|
|
if [t for t in execution.tasks
|
|
|
- if t.status in [constants.TASK_STATUS_ERROR,
|
|
|
- constants.TASK_STATUS_CANCELED]]:
|
|
|
+ if t.status in task_error_states]:
|
|
|
execution_status = constants.EXECUTION_STATUS_ERROR
|
|
|
else:
|
|
|
execution_status = constants.EXECUTION_STATUS_COMPLETED
|
|
|
@@ -1019,6 +1037,24 @@ class ConductorServerEndpoint(object):
|
|
|
self._set_tasks_execution_status(
|
|
|
ctxt, execution.id, execution_status)
|
|
|
|
|
|
+ def _cancel_execution_for_osmorphing_debugging(self, ctxt, execution):
|
|
|
+ # go through all scheduled tasks and cancel them:
|
|
|
+ for subtask in execution.tasks:
|
|
|
+ if subtask.task_type == constants.TASK_TYPE_OS_MORPHING:
|
|
|
+ continue
|
|
|
+
|
|
|
+ if subtask.status == constants.TASK_STATUS_RUNNING:
|
|
|
+ raise execution.CoriolisException(
|
|
|
+ "Task %s is still running although it should not!",
|
|
|
+ subtask.id)
|
|
|
+
|
|
|
+ if subtask.status in [
|
|
|
+ constants.TASK_STATUS_PENDING,
|
|
|
+ constants.TASK_STATUS_ON_ERROR_ONLY]:
|
|
|
+ db_api.set_task_status(
|
|
|
+ ctxt, subtask.id,
|
|
|
+ constants.TASK_STATUS_CANCELED_FOR_DEBUGGING)
|
|
|
+
|
|
|
@task_synchronized
|
|
|
def set_task_error(self, ctxt, task_id, exception_details):
|
|
|
LOG.error("Task error: %(task_id)s - %(ex)s",
|
|
|
@@ -1031,12 +1067,40 @@ class ConductorServerEndpoint(object):
|
|
|
execution = db_api.get_tasks_execution(ctxt, task.execution_id)
|
|
|
|
|
|
action_id = execution.action_id
|
|
|
+ action = db_api.get_action(ctxt, action_id)
|
|
|
with lockutils.lock(action_id):
|
|
|
- self._cancel_tasks_execution(ctxt, execution)
|
|
|
+ if task.task_type == constants.TASK_TYPE_OS_MORPHING and (
|
|
|
+ CONF.conductor.debug_os_morphing_errors):
|
|
|
+ LOG.debug("Attempting to cancel execution '%s' for OSMorphing "
|
|
|
+ "debugging.", execution.id)
|
|
|
+ # NOTE: the OSMorphing task always runs by itself so no
|
|
|
+ # further tasks should be running, but we double-check here:
|
|
|
+ running = [
|
|
|
+ t for t in execution.tasks
|
|
|
+ if t.status == constants.TASK_STATUS_RUNNING
|
|
|
+ and t.task_type != constants.TASK_TYPE_OS_MORPHING]
|
|
|
+ if not running:
|
|
|
+ self._cancel_execution_for_osmorphing_debugging(
|
|
|
+ ctxt, execution)
|
|
|
+ LOG.warn(
|
|
|
+ "All subtasks for Migration '%s' have been cancelled "
|
|
|
+ "to allow for OSMorphing debugging. The connection "
|
|
|
+ "info for the worker VM is: %s",
|
|
|
+ action_id, action.info.get(task.instance, {}).get(
|
|
|
+ 'osmorphing_connection_info', {}))
|
|
|
+ self._set_tasks_execution_status(
|
|
|
+ ctxt, execution.id, constants.EXECUTION_STATUS_ERROR)
|
|
|
+ else:
|
|
|
+ LOG.warn(
|
|
|
+ "Some tasks are running in parallel with the "
|
|
|
+ "OSMorphing task, a debug setup cannot be safely "
|
|
|
+ "achieved. Proceeding with cleanup tasks as usual.")
|
|
|
+ self._cancel_tasks_execution(ctxt, execution)
|
|
|
+ else:
|
|
|
+ self._cancel_tasks_execution(ctxt, execution)
|
|
|
|
|
|
# NOTE: if this is a migration, make sure to delete
|
|
|
# its associated reservation.
|
|
|
- action = db_api.get_action(ctxt, action_id)
|
|
|
if action.type == "migration":
|
|
|
self._check_delete_reservation_for_transfer(action)
|
|
|
|