|
|
@@ -52,10 +52,10 @@ def migration_synchronized(func):
|
|
|
|
|
|
def tasks_execution_synchronized(func):
|
|
|
@functools.wraps(func)
|
|
|
- def wrapper(self, ctxt, execution_id, *args, **kwargs):
|
|
|
+ def wrapper(self, ctxt, replica_id, execution_id, *args, **kwargs):
|
|
|
@lockutils.synchronized(execution_id)
|
|
|
def inner():
|
|
|
- return func(self, ctxt, execution_id, *args, **kwargs)
|
|
|
+ return func(self, ctxt, replica_id, execution_id, *args, **kwargs)
|
|
|
return inner()
|
|
|
return wrapper
|
|
|
|
|
|
@@ -154,7 +154,7 @@ class ConductorServerEndpoint(object):
|
|
|
LOG.info("Replica tasks execution created: %s", execution.id)
|
|
|
|
|
|
self._begin_tasks(ctxt, execution, replica.info)
|
|
|
- return self.get_replica_tasks_execution(ctxt, execution.id)
|
|
|
+ return self.get_replica_tasks_execution(ctxt, replica_id, execution.id)
|
|
|
|
|
|
@replica_synchronized
|
|
|
def get_replica_tasks_executions(self, ctxt, replica_id,
|
|
|
@@ -163,31 +163,32 @@ class ConductorServerEndpoint(object):
|
|
|
ctxt, replica_id, include_tasks)
|
|
|
|
|
|
@tasks_execution_synchronized
|
|
|
- def get_replica_tasks_execution(self, ctxt, execution_id):
|
|
|
+ def get_replica_tasks_execution(self, ctxt, replica_id, execution_id):
|
|
|
return self._get_replica_tasks_execution(
|
|
|
- ctxt, execution_id)
|
|
|
+ ctxt, replica_id, execution_id)
|
|
|
|
|
|
@tasks_execution_synchronized
|
|
|
- def delete_replica_tasks_execution(self, ctxt, execution_id):
|
|
|
+ def delete_replica_tasks_execution(self, ctxt, replica_id, execution_id):
|
|
|
execution = self._get_replica_tasks_execution(
|
|
|
- ctxt, execution_id)
|
|
|
+ ctxt, replica_id, execution_id)
|
|
|
if execution.status == constants.EXECUTION_STATUS_RUNNING:
|
|
|
raise exception.InvalidMigrationState(
|
|
|
"Cannot delete a running replica tasks execution")
|
|
|
db_api.delete_replica_tasks_execution(ctxt, execution_id)
|
|
|
|
|
|
@tasks_execution_synchronized
|
|
|
- def cancel_replica_tasks_execution(self, ctxt, execution_id, force):
|
|
|
+ def cancel_replica_tasks_execution(self, ctxt, replica_id, execution_id,
|
|
|
+ force):
|
|
|
execution = self._get_replica_tasks_execution(
|
|
|
- ctxt, execution_id)
|
|
|
+ ctxt, replica_id, execution_id)
|
|
|
if execution.status != constants.EXECUTION_STATUS_RUNNING:
|
|
|
raise exception.InvalidReplicaState(
|
|
|
"The replica tasks execution is not running")
|
|
|
self._cancel_tasks_execution(ctxt, execution, force)
|
|
|
|
|
|
- def _get_replica_tasks_execution(self, ctxt, execution_id):
|
|
|
+ def _get_replica_tasks_execution(self, ctxt, replica_id, execution_id):
|
|
|
execution = db_api.get_replica_tasks_execution(
|
|
|
- ctxt, execution_id)
|
|
|
+ ctxt, replica_id, execution_id)
|
|
|
if not execution:
|
|
|
raise exception.NotFound("Tasks execution not found")
|
|
|
return execution
|
|
|
@@ -234,7 +235,7 @@ class ConductorServerEndpoint(object):
|
|
|
LOG.info("Replica tasks execution created: %s", execution.id)
|
|
|
|
|
|
self._begin_tasks(ctxt, execution, replica.info)
|
|
|
- return self.get_replica_tasks_execution(ctxt, execution.id)
|
|
|
+ return self.get_replica_tasks_execution(ctxt, replica_id, execution.id)
|
|
|
|
|
|
def create_instances_replica(self, ctxt, origin, destination, instances):
|
|
|
replica = models.Replica()
|