Explorar o código

Adds synchronization in conductor

Alessandro Pilotti %!s(int64=9) %!d(string=hai) anos
pai
achega
e7f879e523
Modificáronse 1 ficheiros con 59 adicións e 0 borrados
  1. 59 0
      coriolis/conductor/rpc/server.py

+ 59 - 0
coriolis/conductor/rpc/server.py

@@ -1,8 +1,10 @@
 # Copyright 2016 Cloudbase Solutions Srl
 # All Rights Reserved.
 
+import functools
 import uuid
 
+from oslo_concurrency import lockutils
 from oslo_log import log as logging
 
 from coriolis import constants
@@ -17,6 +19,46 @@ VERSION = "1.0"
 LOG = logging.getLogger(__name__)
 
 
+def replica_synchronized(func):
+    @functools.wraps(func)
+    def wrapper(self, ctxt, replica_id, *args, **kwargs):
+        @lockutils.synchronized(replica_id)
+        def inner():
+            return func(self, ctxt, replica_id, *args, **kwargs)
+        return inner()
+    return wrapper
+
+
+def task_synchronized(func):
+    @functools.wraps(func)
+    def wrapper(self, ctxt, task_id, *args, **kwargs):
+        @lockutils.synchronized(task_id)
+        def inner():
+            return func(self, ctxt, task_id, *args, **kwargs)
+        return inner()
+    return wrapper
+
+
+def migration_synchronized(func):
+    @functools.wraps(func)
+    def wrapper(self, ctxt, migration_id, *args, **kwargs):
+        @lockutils.synchronized(migration_id)
+        def inner():
+            return func(self, ctxt, migration_id, *args, **kwargs)
+        return inner()
+    return wrapper
+
+
+def tasks_execution_synchronized(func):
+    @functools.wraps(func)
+    def wrapper(self, ctxt, execution_id, *args, **kwargs):
+        @lockutils.synchronized(execution_id)
+        def inner():
+            return func(self, ctxt, execution_id, *args, **kwargs)
+        return inner()
+    return wrapper
+
+
 class ConductorServerEndpoint(object):
     def __init__(self):
         self._rpc_worker_client = rpc_worker_client.WorkerClient()
@@ -44,6 +86,7 @@ class ConductorServerEndpoint(object):
                     instance=task.instance,
                     task_info=task_info.get(task.instance, {}))
 
+    @replica_synchronized
     def execute_replica_tasks(self, ctxt, replica_id, shutdown_instances):
         replica = self._get_replica(ctxt, replica_id)
         self._check_running_executions(replica)
@@ -86,15 +129,18 @@ class ConductorServerEndpoint(object):
         self._begin_tasks(ctxt, execution, replica.info)
         return self.get_replica_tasks_execution(ctxt, execution.id)
 
+    @replica_synchronized
     def get_replica_tasks_executions(self, ctxt, replica_id,
                                      include_tasks=False):
         return db_api.get_replica_tasks_executions(
             ctxt, replica_id, include_tasks)
 
+    @tasks_execution_synchronized
     def get_replica_tasks_execution(self, ctxt, execution_id):
         return self._get_replica_tasks_execution(
             ctxt, execution_id)
 
+    @tasks_execution_synchronized
     def delete_replica_tasks_execution(self, ctxt, execution_id):
         execution = self._get_replica_tasks_execution(
             ctxt, execution_id)
@@ -103,6 +149,7 @@ class ConductorServerEndpoint(object):
                 "Cannot delete a running replica tasks execution")
         db_api.delete_replica_tasks_execution(ctxt, execution_id)
 
+    @tasks_execution_synchronized
     def cancel_replica_tasks_execution(self, ctxt, execution_id):
         execution = self._get_replica_tasks_execution(
             ctxt, execution_id)
@@ -121,14 +168,17 @@ class ConductorServerEndpoint(object):
     def get_replicas(self, ctxt, include_tasks_executions=False):
         return db_api.get_replicas(ctxt, include_tasks_executions)
 
+    @replica_synchronized
     def get_replica(self, ctxt, replica_id):
         return self._get_replica(ctxt, replica_id)
 
+    @replica_synchronized
     def delete_replica(self, ctxt, replica_id):
         replica = self._get_replica(ctxt, replica_id)
         self._check_running_executions(replica)
         db_api.delete_replica(ctxt, replica_id)
 
+    @replica_synchronized
     def delete_replica_disks(self, ctxt, replica_id):
         replica = self._get_replica(ctxt, replica_id)
         self._check_running_executions(replica)
@@ -181,6 +231,7 @@ class ConductorServerEndpoint(object):
     def get_migrations(self, ctxt, include_tasks):
         return db_api.get_migrations(ctxt, include_tasks)
 
+    @migration_synchronized
     def get_migration(self, ctxt, migration_id):
         # the default serialization mechanism enforces a max_depth of 3
         return utils.to_dict(self._get_migration(ctxt, migration_id))
@@ -208,6 +259,7 @@ class ConductorServerEndpoint(object):
                 "A replica must have been executed succesfully in order "
                 "to be migrated")
 
+    @replica_synchronized
     def deploy_replica_instances(self, ctxt, replica_id, forced):
         replica = self._get_replica(ctxt, replica_id)
         self._check_running_executions(replica)
@@ -282,6 +334,7 @@ class ConductorServerEndpoint(object):
             raise exception.NotFound("Migration not found")
         return migration
 
+    @migration_synchronized
     def delete_migration(self, ctxt, migration_id):
         migration = self._get_migration(ctxt, migration_id)
         execution = migration.executions[0]
@@ -290,6 +343,7 @@ class ConductorServerEndpoint(object):
                 "Cannot delete a running migration")
         db_api.delete_migration(ctxt, migration_id)
 
+    @migration_synchronized
     def cancel_migration(self, ctxt, migration_id):
         migration = self._get_migration(ctxt, migration_id)
         execution = migration.executions[0]
@@ -312,6 +366,7 @@ class ConductorServerEndpoint(object):
         db_api.set_execution_status(
             ctxt, execution.id, constants.EXECUTION_STATUS_ERROR)
 
+    @task_synchronized
     def set_task_host(self, ctxt, task_id, host, process_id):
         db_api.set_task_host(ctxt, task_id, host, process_id)
         db_api.set_task_status(
@@ -339,6 +394,7 @@ class ConductorServerEndpoint(object):
                     task_info=task_info)
         return has_pending_tasks
 
+    @task_synchronized
     def task_completed(self, ctxt, task_id, task_info):
         LOG.info("Task completed: %s", task_id)
 
@@ -362,6 +418,7 @@ class ConductorServerEndpoint(object):
             db_api.set_execution_status(
                 ctxt, execution.id, constants.EXECUTION_STATUS_COMPLETED)
 
+    @task_synchronized
     def set_task_error(self, ctxt, task_id, exception_details):
         LOG.error("Task error: %(task_id)s - %(ex)s",
                   {"task_id": task_id, "ex": exception_details})
@@ -382,10 +439,12 @@ class ConductorServerEndpoint(object):
         db_api.set_execution_status(
             ctxt, execution.id, constants.EXECUTION_STATUS_ERROR)
 
+    @task_synchronized
     def task_event(self, ctxt, task_id, level, message):
         LOG.info("Task event: %s", task_id)
         db_api.add_task_event(ctxt, task_id, level, message)
 
+    @task_synchronized
     def task_progress_update(self, ctxt, task_id, current_step, total_steps,
                              message):
         LOG.info("Task progress update: %s", task_id)