|
|
@@ -1496,6 +1496,17 @@ class ConductorServerEndpoint(object):
|
|
|
action = db_api.get_action(ctxt, execution.action_id)
|
|
|
|
|
|
def _start_task(task):
|
|
|
+ task_info = None
|
|
|
+ if task.instance not in action.info:
|
|
|
+ LOG.error(
|
|
|
+ "No info present for instance '%s' in action '%s' for task"
|
|
|
+ " '%s' (type '%s') of execution '%s' (type '%s'). "
|
|
|
+ "Defaulting to empty dict." % (
|
|
|
+ task.instance, action.id, task.id, task.task_type,
|
|
|
+ execution.id, execution.type))
|
|
|
+ task_info = {}
|
|
|
+ else:
|
|
|
+ task_info = action.info[task.instance]
|
|
|
db_api.set_task_status(
|
|
|
ctxt, task.id, constants.TASK_STATUS_PENDING)
|
|
|
self._rpc_worker_client.begin_task(
|
|
|
@@ -1505,7 +1516,7 @@ class ConductorServerEndpoint(object):
|
|
|
origin=origin,
|
|
|
destination=destination,
|
|
|
instance=task.instance,
|
|
|
- task_info=action.info.get(task.instance, {}))
|
|
|
+ task_info=task_info)
|
|
|
started_tasks.append(task.id)
|
|
|
|
|
|
# aggregate tasks and statuses:
|
|
|
@@ -1705,11 +1716,23 @@ class ConductorServerEndpoint(object):
|
|
|
task_type = task.task_type
|
|
|
|
|
|
if task_type == constants.TASK_TYPE_RESTORE_REPLICA_DISK_SNAPSHOTS:
|
|
|
+
|
|
|
# When restoring a snapshot in some import providers (OpenStack),
|
|
|
# a new volume_id is generated. This needs to be updated in the
|
|
|
# Replica instance as well.
|
|
|
- volumes_info = task_info.get("volumes_info", [])
|
|
|
- if volumes_info:
|
|
|
+ volumes_info = task_info.get('volumes_info')
|
|
|
+ if not volumes_info:
|
|
|
+ LOG.warn(
|
|
|
+ "No volumes_info was provided by task '%s' (type '%s') "
|
|
|
+ "after completion. NOT updating parent action '%s'",
|
|
|
+ task.id, task_type, execution.action_id)
|
|
|
+ else:
|
|
|
+ LOG.debug(
|
|
|
+ "Updating volumes_info for instance '%s' in parent action "
|
|
|
+ "'%s' following completion of task '%s' (type '%s'): %s",
|
|
|
+ task.instance, execution.action_id, task.id, task_type,
|
|
|
+ utils.filter_chunking_info_for_task(
|
|
|
+ {'volumes_info': volumes_info}))
|
|
|
self._update_volumes_info_for_migration_parent_replica(
|
|
|
ctxt, execution.action_id, task.instance,
|
|
|
{"volumes_info": volumes_info})
|
|
|
@@ -1720,6 +1743,12 @@ class ConductorServerEndpoint(object):
|
|
|
# The migration completed. If the replica is executed again,
|
|
|
# new volumes need to be deployed in place of the migrated
|
|
|
# ones.
|
|
|
+ LOG.info(
|
|
|
+ "Unsetting 'volumes_info' for instance '%s' in Replica "
|
|
|
+ "'%s' after completion of Replica task '%s' (type '%s') "
|
|
|
+ "with clone_disks=False.",
|
|
|
+ task.instance, execution.action_id, task.id,
|
|
|
+ task_type)
|
|
|
self._update_volumes_info_for_migration_parent_replica(
|
|
|
ctxt, execution.action_id, task.instance,
|
|
|
{"volumes_info": []})
|
|
|
@@ -1778,6 +1807,10 @@ class ConductorServerEndpoint(object):
|
|
|
# as they are in the DB:
|
|
|
db_api.update_replica(
|
|
|
ctxt, execution.action_id, task_info)
|
|
|
+ else:
|
|
|
+ LOG.debug(
|
|
|
+ "No post-task actions required for task '%s' of type '%s'",
|
|
|
+ task.id, task_type)
|
|
|
|
|
|
@task_and_execution_synchronized
|
|
|
def task_completed(self, ctxt, task_id, task_result):
|