瀏覽代碼

Configure cross-process locks for conductor.

Nashwan Azhari 6 年之前
父節點
當前提交
43fc27232f
共有 5 個文件被更改,包括 89 次插入23 次删除
  1. 1 0
      coriolis/cmd/conductor.py
  2. 33 21
      coriolis/conductor/rpc/server.py
  3. 41 0
      coriolis/service.py
  4. 12 0
      coriolis/tasks/replica_tasks.py
  5. 2 2
      coriolis/worker/rpc/server.py

+ 1 - 0
coriolis/cmd/conductor.py

@@ -16,6 +16,7 @@ def main():
     CONF(sys.argv[1:], project='coriolis',
     CONF(sys.argv[1:], project='coriolis',
          version="1.0.0")
          version="1.0.0")
     utils.setup_logging()
     utils.setup_logging()
+    service.check_locks_dir_empty()
 
 
     server = service.MessagingService(
     server = service.MessagingService(
         'coriolis_conductor', [rpc_server.ConductorServerEndpoint()],
         'coriolis_conductor', [rpc_server.ConductorServerEndpoint()],

+ 33 - 21
coriolis/conductor/rpc/server.py

@@ -3,7 +3,6 @@
 
 
 import copy
 import copy
 import functools
 import functools
-import itertools
 import uuid
 import uuid
 
 
 from oslo_concurrency import lockutils
 from oslo_concurrency import lockutils
@@ -49,7 +48,8 @@ def endpoint_synchronized(func):
     @functools.wraps(func)
     @functools.wraps(func)
     def wrapper(self, ctxt, endpoint_id, *args, **kwargs):
     def wrapper(self, ctxt, endpoint_id, *args, **kwargs):
         @lockutils.synchronized(
         @lockutils.synchronized(
-            constants.ENDPOINT_LOCK_NAME_FORMAT % endpoint_id)
+            constants.ENDPOINT_LOCK_NAME_FORMAT % endpoint_id,
+            external=True)
         def inner():
         def inner():
             return func(self, ctxt, endpoint_id, *args, **kwargs)
             return func(self, ctxt, endpoint_id, *args, **kwargs)
         return inner()
         return inner()
@@ -60,7 +60,8 @@ def replica_synchronized(func):
     @functools.wraps(func)
     @functools.wraps(func)
     def wrapper(self, ctxt, replica_id, *args, **kwargs):
     def wrapper(self, ctxt, replica_id, *args, **kwargs):
         @lockutils.synchronized(
         @lockutils.synchronized(
-            constants.REPLICA_LOCK_NAME_FORMAT % replica_id)
+            constants.REPLICA_LOCK_NAME_FORMAT % replica_id,
+            external=True)
         def inner():
         def inner():
             return func(self, ctxt, replica_id, *args, **kwargs)
             return func(self, ctxt, replica_id, *args, **kwargs)
         return inner()
         return inner()
@@ -71,7 +72,8 @@ def schedule_synchronized(func):
     @functools.wraps(func)
     @functools.wraps(func)
     def wrapper(self, ctxt, replica_id, schedule_id, *args, **kwargs):
     def wrapper(self, ctxt, replica_id, schedule_id, *args, **kwargs):
         @lockutils.synchronized(
         @lockutils.synchronized(
-                constants.SCHEDULE_LOCK_NAME_FORMAT % schedule_id)
+            constants.SCHEDULE_LOCK_NAME_FORMAT % schedule_id,
+            external=True)
         def inner():
         def inner():
             return func(self, ctxt, replica_id, schedule_id, *args, **kwargs)
             return func(self, ctxt, replica_id, schedule_id, *args, **kwargs)
         return inner()
         return inner()
@@ -82,21 +84,24 @@ def task_synchronized(func):
     @functools.wraps(func)
     @functools.wraps(func)
     def wrapper(self, ctxt, task_id, *args, **kwargs):
     def wrapper(self, ctxt, task_id, *args, **kwargs):
         @lockutils.synchronized(
         @lockutils.synchronized(
-                constants.TASK_LOCK_NAME_FORMAT % task_id)
+            constants.TASK_LOCK_NAME_FORMAT % task_id,
+            external=True)
         def inner():
         def inner():
             return func(self, ctxt, task_id, *args, **kwargs)
             return func(self, ctxt, task_id, *args, **kwargs)
         return inner()
         return inner()
     return wrapper
     return wrapper
 
 
 
 
-def task_and_execution_synchronized(func):
+def parent_tasks_execution_synchronized(func):
     @functools.wraps(func)
     @functools.wraps(func)
     def wrapper(self, ctxt, task_id, *args, **kwargs):
     def wrapper(self, ctxt, task_id, *args, **kwargs):
         task = db_api.get_task(ctxt, task_id)
         task = db_api.get_task(ctxt, task_id)
         @lockutils.synchronized(
         @lockutils.synchronized(
-            constants.EXECUTION_LOCK_NAME_FORMAT % task.execution_id)
+            constants.EXECUTION_LOCK_NAME_FORMAT % task.execution_id,
+            external=True)
         @lockutils.synchronized(
         @lockutils.synchronized(
-            constants.TASK_LOCK_NAME_FORMAT % task_id)
+            constants.TASK_LOCK_NAME_FORMAT % task_id,
+            external=True)
         def inner():
         def inner():
             return func(self, ctxt, task_id, *args, **kwargs)
             return func(self, ctxt, task_id, *args, **kwargs)
         return inner()
         return inner()
@@ -107,7 +112,8 @@ def migration_synchronized(func):
     @functools.wraps(func)
     @functools.wraps(func)
     def wrapper(self, ctxt, migration_id, *args, **kwargs):
     def wrapper(self, ctxt, migration_id, *args, **kwargs):
         @lockutils.synchronized(
         @lockutils.synchronized(
-            constants.MIGRATION_LOCK_NAME_FORMAT % migration_id)
+            constants.MIGRATION_LOCK_NAME_FORMAT % migration_id,
+            external=True)
         def inner():
         def inner():
             return func(self, ctxt, migration_id, *args, **kwargs)
             return func(self, ctxt, migration_id, *args, **kwargs)
         return inner()
         return inner()
@@ -118,7 +124,8 @@ def tasks_execution_synchronized(func):
     @functools.wraps(func)
     @functools.wraps(func)
     def wrapper(self, ctxt, replica_id, execution_id, *args, **kwargs):
     def wrapper(self, ctxt, replica_id, execution_id, *args, **kwargs):
         @lockutils.synchronized(
         @lockutils.synchronized(
-            constants.EXECUTION_LOCK_NAME_FORMAT % execution_id)
+            constants.EXECUTION_LOCK_NAME_FORMAT % execution_id,
+            external=True)
         def inner():
         def inner():
             return func(self, ctxt, replica_id, execution_id, *args, **kwargs)
             return func(self, ctxt, replica_id, execution_id, *args, **kwargs)
         return inner()
         return inner()
@@ -1225,7 +1232,8 @@ class ConductorServerEndpoint(object):
                 "force option if you'd like to force-cancel it.")
                 "force option if you'd like to force-cancel it.")
 
 
         with lockutils.lock(
         with lockutils.lock(
-                constants.EXECUTION_LOCK_NAME_FORMAT % execution.id):
+                constants.EXECUTION_LOCK_NAME_FORMAT % execution.id,
+                external=True):
             self._cancel_tasks_execution(ctxt, execution, force=force)
             self._cancel_tasks_execution(ctxt, execution, force=force)
         self._check_delete_reservation_for_transfer(migration)
         self._check_delete_reservation_for_transfer(migration)
 
 
@@ -1344,7 +1352,7 @@ class ConductorServerEndpoint(object):
         if ctxt.delete_trust_id:
         if ctxt.delete_trust_id:
             keystone.delete_trust(ctxt)
             keystone.delete_trust(ctxt)
 
 
-    @task_and_execution_synchronized
+    @parent_tasks_execution_synchronized
     def set_task_host(self, ctxt, task_id, host, process_id):
     def set_task_host(self, ctxt, task_id, host, process_id):
         """ Saves the ID of the worker host which has accepted and started
         """ Saves the ID of the worker host which has accepted and started
         the task to the DB and marks the task as 'RUNNING'. """
         the task to the DB and marks the task as 'RUNNING'. """
@@ -1755,7 +1763,8 @@ class ConductorServerEndpoint(object):
         replica_id = migration.replica_id
         replica_id = migration.replica_id
 
 
         with lockutils.lock(
         with lockutils.lock(
-                constants.REPLICA_LOCK_NAME_FORMAT % replica_id):
+                constants.REPLICA_LOCK_NAME_FORMAT % replica_id,
+                external=True):
             LOG.debug(
             LOG.debug(
                 "Updating volume_info in replica due to snapshot "
                 "Updating volume_info in replica due to snapshot "
                 "restore during migration. replica id: %s", replica_id)
                 "restore during migration. replica id: %s", replica_id)
@@ -1870,7 +1879,7 @@ class ConductorServerEndpoint(object):
                 "No post-task actions required for task '%s' of type '%s'",
                 "No post-task actions required for task '%s' of type '%s'",
                 task.id, task_type)
                 task.id, task_type)
 
 
-    @task_and_execution_synchronized
+    @parent_tasks_execution_synchronized
     def task_completed(self, ctxt, task_id, task_result):
     def task_completed(self, ctxt, task_id, task_result):
         LOG.info("Task completed: %s", task_id)
         LOG.info("Task completed: %s", task_id)
 
 
@@ -1934,7 +1943,8 @@ class ConductorServerEndpoint(object):
         execution = db_api.get_tasks_execution(ctxt, task.execution_id)
         execution = db_api.get_tasks_execution(ctxt, task.execution_id)
         with lockutils.lock(
         with lockutils.lock(
                 constants.EXECUTION_TYPE_TO_ACTION_LOCK_NAME_FORMAT_MAP[
                 constants.EXECUTION_TYPE_TO_ACTION_LOCK_NAME_FORMAT_MAP[
-                    execution.type] % execution.action_id):
+                    execution.type] % execution.action_id,
+                external=True):
             action_id = execution.action_id
             action_id = execution.action_id
             action = db_api.get_action(ctxt, action_id)
             action = db_api.get_action(ctxt, action_id)
 
 
@@ -1999,7 +2009,7 @@ class ConductorServerEndpoint(object):
                     ctxt, subtask.id,
                     ctxt, subtask.id,
                     constants.TASK_STATUS_CANCELED_FOR_DEBUGGING)
                     constants.TASK_STATUS_CANCELED_FOR_DEBUGGING)
 
 
-    @tasks_execution_synchronized
+    @parent_tasks_execution_synchronized
     def confirm_task_cancellation(self, ctxt, task_id, cancellation_details):
     def confirm_task_cancellation(self, ctxt, task_id, cancellation_details):
         LOG.info(
         LOG.info(
             "Received confirmation of cancellation for task '%s': %s",
             "Received confirmation of cancellation for task '%s': %s",
@@ -2009,7 +2019,7 @@ class ConductorServerEndpoint(object):
         final_status = constants.TASK_STATUS_CANCELED
         final_status = constants.TASK_STATUS_CANCELED
         exception_details = (
         exception_details = (
             "This task was user-cancelled. Additional cancellation "
             "This task was user-cancelled. Additional cancellation "
-            "info: '%s'", cancellation_details)
+            "info from worker service: '%s'" % cancellation_details)
         if task.status == constants.TASK_STATUS_CANCELLING_AFTER_COMPLETION:
         if task.status == constants.TASK_STATUS_CANCELLING_AFTER_COMPLETION:
             LOG.error(
             LOG.error(
                 "Received cancellation confirmation for task '%s' which was "
                 "Received cancellation confirmation for task '%s' which was "
@@ -2036,8 +2046,9 @@ class ConductorServerEndpoint(object):
 
 
         if final_status == task.status:
         if final_status == task.status:
             LOG.debug(
             LOG.debug(
-                "NOT altering state of task '%s' ('%s') following confirmation"
-                " of cancellation. Updating its exception details though.")
+                "NOT altering state of finalized task '%s' ('%s') following "
+                "confirmation of cancellation. Updating its exception "
+                "details though: %s", task.id, task.status, exception_details)
             db_api.set_task_status(
             db_api.set_task_status(
                 ctxt, task.id, final_status,
                 ctxt, task.id, final_status,
                 exception_details=exception_details)
                 exception_details=exception_details)
@@ -2052,7 +2063,7 @@ class ConductorServerEndpoint(object):
             execution = db_api.get_tasks_execution(ctxt, task.execution_id)
             execution = db_api.get_tasks_execution(ctxt, task.execution_id)
             self._advance_execution_state(ctxt, execution, requery=False)
             self._advance_execution_state(ctxt, execution, requery=False)
 
 
-    @task_and_execution_synchronized
+    @parent_tasks_execution_synchronized
     def set_task_error(self, ctxt, task_id, exception_details):
     def set_task_error(self, ctxt, task_id, exception_details):
         LOG.error(
         LOG.error(
             "Received error confirmation for task: %(task_id)s - %(ex)s",
             "Received error confirmation for task: %(task_id)s - %(ex)s",
@@ -2115,7 +2126,8 @@ class ConductorServerEndpoint(object):
         action = db_api.get_action(ctxt, action_id)
         action = db_api.get_action(ctxt, action_id)
         with lockutils.lock(
         with lockutils.lock(
                 constants.EXECUTION_TYPE_TO_ACTION_LOCK_NAME_FORMAT_MAP[
                 constants.EXECUTION_TYPE_TO_ACTION_LOCK_NAME_FORMAT_MAP[
-                    execution.type] % action_id):
+                    execution.type] % action_id,
+                external=True):
             if task.task_type == constants.TASK_TYPE_OS_MORPHING and (
             if task.task_type == constants.TASK_TYPE_OS_MORPHING and (
                     CONF.conductor.debug_os_morphing_errors):
                     CONF.conductor.debug_os_morphing_errors):
                 LOG.debug(
                 LOG.debug(

+ 41 - 0
coriolis/service.py

@@ -1,6 +1,7 @@
 # Copyright 2016 Cloudbase Solutions Srl
 # Copyright 2016 Cloudbase Solutions Srl
 # All Rights Reserved.
 # All Rights Reserved.
 
 
+import os
 import platform
 import platform
 
 
 from oslo_concurrency import processutils
 from oslo_concurrency import processutils
@@ -34,6 +35,46 @@ CONF.register_opts(service_opts)
 LOG = logging.getLogger(__name__)
 LOG = logging.getLogger(__name__)
 
 
 
 
+def check_locks_dir_empty():
+    """ Checks whether the locks dir is empty and warns otherwise.
+
+    NOTE: external oslo_concurrency locks work based on listing open file
+    descriptors so this check is not necessarily conclusive, though all freshly
+    started/restarted conductor services should ideally be given a clean slate.
+    """
+    oslo_concurrency_group = getattr(CONF, 'oslo_concurrency', {})
+    if not oslo_concurrency_group:
+        LOG.warn("No 'oslo_concurrency' group defined in config file!")
+        return
+
+    locks_dir = oslo_concurrency_group.get('lock_path', "")
+    if not locks_dir:
+        LOG.warn("No locks directory path was configured!")
+        return
+
+    if not os.path.exists(locks_dir):
+        LOG.warn(
+            "Configured 'lock_path' directory '%s' does NOT exist!", locks_dir)
+        return
+
+    if not os.path.isdir(locks_dir):
+        LOG.warn(
+            "Configured 'lock_path' directory '%s' is NOT a directory!",
+            locks_dir)
+        return
+
+    locks_dir_contents = os.listdir(locks_dir)
+    if locks_dir_contents:
+        LOG.warn(
+            "Configured 'lock_path' directory '%s' is NOT empty: %s",
+            locks_dir, locks_dir_contents)
+        return
+
+    LOG.info(
+        "Successfully checked 'lock_path' directory '%s' exists and is empty.",
+        locks_dir)
+
+
 class WSGIService(service.ServiceBase):
 class WSGIService(service.ServiceBase):
     def __init__(self, name):
     def __init__(self, name):
         self._host = CONF.api_migration_listen
         self._host = CONF.api_migration_listen

+ 12 - 0
coriolis/tasks/replica_tasks.py

@@ -792,6 +792,12 @@ class UpdateSourceReplicaTask(base.TaskRunner):
         if volumes_info:
         if volumes_info:
             schemas.validate_value(
             schemas.validate_value(
                 volumes_info, schemas.CORIOLIS_VOLUMES_INFO_SCHEMA)
                 volumes_info, schemas.CORIOLIS_VOLUMES_INFO_SCHEMA)
+        else:
+            LOG.warn(
+                "Source update method for '%s' source provider did NOT "
+                "return any volumes info. Defaulting to old value.",
+                origin["type"])
+            volumes_info = task_info.get("volumes_info", [])
 
 
         return {
         return {
             "volumes_info": volumes_info,
             "volumes_info": volumes_info,
@@ -849,6 +855,12 @@ class UpdateDestinationReplicaTask(base.TaskRunner):
                 volumes_info, schemas.CORIOLIS_VOLUMES_INFO_SCHEMA)
                 volumes_info, schemas.CORIOLIS_VOLUMES_INFO_SCHEMA)
             volumes_info = _check_ensure_volumes_info_ordering(
             volumes_info = _check_ensure_volumes_info_ordering(
                 export_info, volumes_info)
                 export_info, volumes_info)
+        else:
+            LOG.warn(
+                "Destination update method for '%s' dest provider did NOT "
+                "return any volumes info. Defaulting to old value.",
+                destination["type"])
+            volumes_info = task_info.get("volumes_info", [])
 
 
         return {
         return {
             "volumes_info": volumes_info,
             "volumes_info": volumes_info,

+ 2 - 2
coriolis/worker/rpc/server.py

@@ -96,7 +96,7 @@ class WorkerServerEndpoint(object):
                     process_id, task_id))
                     process_id, task_id))
             LOG.error(msg)
             LOG.error(msg)
             self._rpc_conductor_client.confirm_task_cancellation(
             self._rpc_conductor_client.confirm_task_cancellation(
-                ctxt, task_id, cancellation_details=msg)
+                ctxt, task_id, msg)
 
 
     def _handle_mp_log_events(self, p, mp_log_q):
     def _handle_mp_log_events(self, p, mp_log_q):
         while True:
         while True:
@@ -228,7 +228,7 @@ class WorkerServerEndpoint(object):
         except exception.TaskProcessCanceledException as ex:
         except exception.TaskProcessCanceledException as ex:
             LOG.exception(ex)
             LOG.exception(ex)
             self._rpc_conductor_client.confirm_task_cancellation(
             self._rpc_conductor_client.confirm_task_cancellation(
-                ctxt, task_id, cancellation_details=str(ex))
+                ctxt, task_id, str(ex))
         except Exception as ex:
         except Exception as ex:
             LOG.exception(ex)
             LOG.exception(ex)
             self._rpc_conductor_client.set_task_error(ctxt, task_id, str(ex))
             self._rpc_conductor_client.set_task_error(ctxt, task_id, str(ex))