Просмотр исходного кода

Account for workers failing to receive task cancellation requests.

Nashwan Azhari 5 лет назад
Родитель
Сommit
25b54082ba
2 измененных файлов с 77 добавлено и 11 удалено
  1. 72 9
      coriolis/conductor/rpc/server.py
  2. 5 2
      coriolis/constants.py

+ 72 - 9
coriolis/conductor/rpc/server.py

@@ -262,7 +262,7 @@ class ConductorServerEndpoint(object):
         topic = constants.SERVICE_MESSAGING_TOPIC_FORMAT % ({
             "main_topic": constants.WORKER_MAIN_MESSAGING_TOPIC,
             "host": host})
-        return rpc_client_class(topic=topic, *client_args, **client_kwargs)
+        return rpc_client_class(*client_args, topic=topic, **client_kwargs)
 
     def _get_worker_service_rpc_for_specs(
             self, ctxt, provider_requirements=None, region_sets=None,
@@ -2445,7 +2445,7 @@ class ConductorServerEndpoint(object):
 
         # iterate through and kill/cancel any non-error
         # tasks which are running/pending:
-        for task in execution.tasks:
+        for task in sorted(execution.tasks, key=lambda t: t.index):
 
             # if force is provided, force-cancel tasks directly:
             if force and task.status in constants.ACTIVE_TASK_STATUSES:
@@ -2482,14 +2482,35 @@ class ConductorServerEndpoint(object):
                 # cancel any currently running/pending non-error tasks:
                 if not task.on_error:
                     LOG.debug(
-                        "Killing %s non-error task '%s' as part of "
-                        "cancellation of execution '%s'",
+                        "Sending cancellation request for  %s non-error task  "
+                        "'%s' as part of cancellation of execution '%s'",
                         task.status, task.id, execution.id)
                     db_api.set_task_status(
                         ctxt, task.id, constants.TASK_STATUS_CANCELLING)
-                    worker_rpc = self._get_worker_rpc_for_host(task.host)
-                    worker_rpc.cancel_task(
-                        ctxt, task.host, task.id, task.process_id, force)
+                    try:
+                        worker_rpc = self._get_worker_rpc_for_host(
+                            # NOTE: we intetionally lowball the timeout for the
+                            # cancellation call to prevent the conductor from
+                            # hanging an excessive amount of time:
+                            task.host, timeout=10)
+                        worker_rpc.cancel_task(
+                            ctxt, task.host, task.id, task.process_id, force)
+                    except (Exception, KeyboardInterrupt):
+                        msg = (
+                            "Failed to send cancellation request for task '%s'"
+                            "to worker host '%s' as part of cancellation of "
+                            "execution '%s'. Marking task as '%s' for now and "
+                            "awaiting any eventual worker replies later." % (
+                                task.id, task.host, execution.id,
+                                constants.TASK_STATUS_FAILED_TO_CANCEL))
+                        LOG.error(
+                            "%s. Exception was: %s", msg,
+                            utils.get_exception_details())
+                        db_api.set_task_status(
+                            ctxt, task.id,
+                            constants.TASK_STATUS_FAILED_TO_CANCEL,
+                            exception_details=msg)
+
                 # let any on-error tasks run to completion but mark
                 # them as CANCELLING_AFTER_COMPLETION so they will
                 # be marked as cancelled once they are completed:
@@ -3353,6 +3374,22 @@ class ConductorServerEndpoint(object):
                     "completion. Please review the worker logs for "
                     "more relevant details." % (
                         task.host)))
+        elif task.status == constants.TASK_STATUS_FAILED_TO_CANCEL:
+            LOG.error(
+                "Received confirmation that presumably '%s' task '%s' has just "
+                "completed successfully. Marking as '%s' and processing its "
+                "result as if it had completed normally.",
+                task.status, task.id,
+                constants.TASK_STATUS_CANCELED_AFTER_COMPLETION)
+            db_api.set_task_status(
+                ctxt, task_id, constants.TASK_STATUS_CANCELED_AFTER_COMPLETION,
+                exception_details=(
+                    "The worker host for this task ('%s') had not either not "
+                    "accepted task cancellation request when it was asked to "
+                    "or had failed to receive the request, so this task was "
+                    "run to completion. Please review the worker logs for "
+                    "more relevant details." % (
+                        task.host)))
         elif task.status in constants.FINALIZED_TASK_STATUSES:
             LOG.error(
                 "Received confirmation that presumably finalized task '%s' "
@@ -3473,13 +3510,27 @@ class ConductorServerEndpoint(object):
             # it means a force cancel has been issued before the
             # confirmation that the task was canceled came in:
             LOG.warn(
-                "Only just received error confirmation for force-cancelled "
-                "task '%s'. Leaving marked as force-cancelled.", task.id)
+                "Only just received cancellation confirmation for "
+                "force-canceled task '%s'. Leaving marked as "
+                "force-cancelled.", task.id)
             final_status = constants.TASK_STATUS_FORCE_CANCELED
             exception_details = (
                 "This task was force-cancelled. Confirmation of its "
                 "cancellation did eventually come in. Additional details on "
                 "the cancellation: %s" % cancellation_details)
+        elif task.status == constants.TASK_STATUS_FAILED_TO_CANCEL:
+            final_status = constants.TASK_STATUS_CANCELED
+            LOG.warn(
+                "Only just received cancellation confirmation for task '%s' "
+                "despite it being marked as '%s'. Marking as '%s' anyway. "
+                "Error reported by worker was: %s",
+                task.id, task.status, final_status, exception_details)
+            exception_details = (
+                "The worker service for this task (%s) had either failed to "
+                "cancel this task when it was initially asked to or did not "
+                "receive the cancellation request, but it did eventually "
+                "confirm the task's cancellation with the following "
+                "details: %s" % cancellation_details)
         elif task.status in constants.FINALIZED_TASK_STATUSES:
             LOG.warn(
                 "Received confirmation of cancellation for already finalized "
@@ -3548,6 +3599,18 @@ class ConductorServerEndpoint(object):
             exception_details = (
                 "This task was force-cancelled but ended up errorring anyway. "
                 "The error details were: '%s'" % exception_details)
+        elif task.status == constants.TASK_STATUS_FAILED_TO_CANCEL:
+            final_status = constants.TASK_STATUS_CANCELED
+            LOG.warn(
+                "Only just received error confirmation for task '%s' despite "
+                "it being marked as '%s'. Marking as '%s' anyway. Error "
+                "reported by worker was: %s",
+                task.id, task.status, final_status, exception_details)
+            exception_details = (
+                "The worker service for this task (%s) has failed to cancel "
+                "this task when it was asked to, and the task eventually "
+                "exited with an error. The error details were: %s" % (
+                    task.id, exception_details))
         elif task.status in constants.FINALIZED_TASK_STATUSES:
             LOG.error(
                 "Error confirmation on task '%s' arrived despite it being "

+ 5 - 2
coriolis/constants.py

@@ -33,6 +33,7 @@ TASK_STATUS_RUNNING = "RUNNING"
 TASK_STATUS_COMPLETED = "COMPLETED"
 TASK_STATUS_ERROR = "ERROR"
 TASK_STATUS_FORCE_CANCELED = "FORCE_CANCELED"
+TASK_STATUS_FAILED_TO_CANCEL = "FAILED_TO_CANCEL"
 TASK_STATUS_CANCELED = "CANCELED"
 TASK_STATUS_CANCELED_AFTER_COMPLETION = "CANCELED_AFTER_COMPLETION"
 TASK_STATUS_CANCELLING = "CANCELLING"
@@ -57,7 +58,8 @@ CANCELED_TASK_STATUSES = [
     TASK_STATUS_CANCELED_AFTER_COMPLETION,
     TASK_STATUS_CANCELED_FOR_DEBUGGING,
     TASK_STATUS_CANCELED_FROM_DEADLOCK,
-    TASK_STATUS_FAILED_TO_SCHEDULE
+    TASK_STATUS_FAILED_TO_SCHEDULE,
+    TASK_STATUS_FAILED_TO_CANCEL
 ]
 
 FINALIZED_TASK_STATUSES = [
@@ -69,7 +71,8 @@ FINALIZED_TASK_STATUSES = [
     TASK_STATUS_CANCELED_FOR_DEBUGGING,
     TASK_STATUS_CANCELED_FROM_DEADLOCK,
     TASK_STATUS_CANCELED_AFTER_COMPLETION,
-    TASK_STATUS_FAILED_TO_SCHEDULE
+    TASK_STATUS_FAILED_TO_SCHEDULE,
+    TASK_STATUS_FAILED_TO_CANCEL
 ]
 
 TASK_TYPE_DEPLOY_MIGRATION_SOURCE_RESOURCES = (