Просмотр исходного кода

Add intermediary 'SCHEDULING' task state.

Nashwan Azhari 5 лет назад
Родитель
Сommit
e4c89cdfba

+ 6 - 3
coriolis/conductor/rpc/client.py

@@ -234,10 +234,13 @@ class ConductorClient(object):
         self._client.call(
             ctxt, 'cancel_migration', migration_id=migration_id, force=force)
 
-    def set_task_host(self, ctxt, task_id, host, process_id):
+    def set_task_host(self, ctxt, task_id, host):
         self._client.call(
-            ctxt, 'set_task_host', task_id=task_id, host=host,
-            process_id=process_id)
+            ctxt, 'set_task_host', task_id=task_id, host=host)
+
+    def set_task_process(self, ctxt, task_id, process_id):
+        self._client.call(
+            ctxt, 'set_task_process', task_id=task_id, process_id=process_id)
 
     def task_completed(self, ctxt, task_id, task_result):
         self._client.call(

+ 96 - 54
coriolis/conductor/rpc/server.py

@@ -604,7 +604,6 @@ class ConductorServerEndpoint(object):
                     constants.PROVIDER_PLATFORM_DESTINATION])
 
         worker_rpc = None
-        exceptions = []
         for i in range(retry_count):
             try:
                 LOG.debug(
@@ -624,26 +623,22 @@ class ConductorServerEndpoint(object):
             except Exception as ex:
                 LOG.warn(
                     "Failed to schedule task with ID '%s' (attempt %d/%d). "
-                    "waiting %d seconds and then retrying. Error was: %s",
-                    task.id, i+1, retry_count, utils.get_exception_details())
-                exceptions.append(ex)
+                    "Waiting %d seconds and then retrying. Error was: %s",
+                    task.id, i+1, retry_count, retry_period,
+                    utils.get_exception_details())
                 time.sleep(retry_period)
 
-        errors_str = ""
-        nerrors = len(exceptions)
-        for i, ex in enumerate(exceptions):
-            errors_str = "%s; (%d/%d) %s" % (
-                errors_str, i+1, nerrors, str(ex))
         message = (
-            "Failed to schedule task. This may indicate that there are no "
-            "Coriolis Worker services able to perform the task on the "
-            "platforms and in the Coriolis Regions required by the "
+            "Failed to schedule task %s after %d tries. This may indicate that"
+            " there are no Coriolis Worker services able to perform the task "
+            "on the platforms and in the Coriolis Regions required by the "
             "selected source/destination Coriolis Endpoints. Please review"
-            " the scheduler logs for more exact details. "
-            "Encountered errors were: %s" % errors_str[2:])
+            " the Conductor and Scheduler logs for more exact details." % (
+                task.id, retry_count))
         db_api.set_task_status(
             ctxt, task.id, constants.TASK_STATUS_FAILED_TO_SCHEDULE,
             exception_details=message)
+        raise exception.NoSuitableWorkerServiceError(message)
 
     def _begin_tasks(
             self, ctxt, execution, task_info={},
@@ -683,14 +678,13 @@ class ConductorServerEndpoint(object):
                         instance=task.instance,
                         task_info=task_info.get(task.instance, {}))
                 except Exception as ex:
-                    msg = (
+                    LOG.warn(
                         "Error occured while starting new task '%s'. "
-                        "Cancelling execution '%s'. Error was: %s" % (
-                            task.id, execution.id,
-                            utils.get_exception_details()))
+                        "Cancelling execution '%s'. Error was: %s",
+                        task.id, execution.id, utils.get_exception_details())
                     self._cancel_tasks_execution(
                         ctxt, execution, requery=True)
-                    raise exception.CoriolisException(msg) from ex
+                    raise
                 newly_started_tasks.append(task.id)
 
         # NOTE: this should never happen if _check_execution_tasks_sanity
@@ -1063,6 +1057,12 @@ class ConductorServerEndpoint(object):
 
     @staticmethod
     def _check_endpoints(ctxt, origin_endpoint, destination_endpoint):
+        if origin_endpoint.id == destination_endpoint.id:
+            raise exception.SameDestination(
+                "The origin and destination endpoints cannot be the same. "
+                "If you need to perform operations across two areas of "
+                "the same platform (ex: migrating across public cloud regions)"
+                ", please create two separate endpoints.")
         # TODO(alexpilotti): check Barbican secrets content as well
         if (origin_endpoint.connection_info ==
                 destination_endpoint.connection_info):
@@ -1078,8 +1078,8 @@ class ConductorServerEndpoint(object):
 
         replica = models.Replica()
         replica.id = str(uuid.uuid4())
-        replica.origin_endpoint = origin_endpoint
-        replica.destination_endpoint = destination_endpoint
+        replica.origin_endpoint_id = origin_endpoint_id
+        replica.destination_endpoint_id = destination_endpoint_id
         replica.destination_environment = destination_environment
         replica.source_environment = source_environment
         replica.instances = instances
@@ -1341,8 +1341,8 @@ class ConductorServerEndpoint(object):
 
         migration = models.Migration()
         migration.id = str(uuid.uuid4())
-        migration.origin_endpoint = origin_endpoint
-        migration.destination_endpoint = destination_endpoint
+        migration.origin_endpoint_id = origin_endpoint_id
+        migration.destination_endpoint_id = destination_endpoint_id
         migration.destination_environment = destination_environment
         migration.source_environment = source_environment
         migration.network_map = network_map
@@ -1623,8 +1623,24 @@ class ConductorServerEndpoint(object):
                 continue
 
             if task.status in (
-                    constants.TASK_STATUS_RUNNING,
-                    constants.TASK_STATUS_PENDING):
+                    constants.TASK_STATUS_PENDING,
+                    constants.TASK_STATUS_STARTING):
+                # any PENDING/STARTING tasks means that they did not have a
+                # host assigned to them yet, and presuming the host does not
+                # start executing the task until it marks itself as the runner,
+                # we can just mark the task as cancelled:
+                LOG.debug(
+                    "Setting currently '%s' task '%s' to '%s' as part of the "
+                    "cancellation of execution '%s'",
+                    task.status, task.id,
+                    constants.TASK_STATUS_UNSCHEDULED, execution.id)
+                db_api.set_task_status(
+                    ctxt, task.id, constants.TASK_STATUS_UNSCHEDULED,
+                    exception_details=(
+                        "This task was already pending execution but was "
+                        "unscheduled during the cancellation of the parent "
+                        "tasks execution."))
+            elif task.status == constants.TASK_STATUS_RUNNING:
                 # cancel any currently running/pending non-error tasks:
                 if not task.on_error:
                     LOG.debug(
@@ -1667,7 +1683,8 @@ class ConductorServerEndpoint(object):
                     "execution '%s'",
                     task.id, task.status, task.on_error, execution.id)
 
-        started_tasks = self._advance_execution_state(ctxt, execution)
+        started_tasks = self._advance_execution_state(
+            ctxt, execution, requery=True)
         if started_tasks:
             LOG.info(
                 "The following tasks were started after state advancement "
@@ -1688,11 +1705,11 @@ class ConductorServerEndpoint(object):
             keystone.delete_trust(ctxt)
 
     @parent_tasks_execution_synchronized
-    def set_task_host(self, ctxt, task_id, host, process_id):
-        """ Saves the ID of the worker host which has accepted and started
-        the task to the DB and marks the task as 'RUNNING'. """
+    def set_task_host(self, ctxt, task_id, host):
+        """ Saves the ID of the worker host which has accepted
+        the task to the DB and marks the task as STARTING. """
         task = db_api.get_task(ctxt, task_id)
-        new_status = constants.TASK_STATUS_RUNNING
+        new_status = constants.TASK_STATUS_STARTING
         exception_details = None
         if task.status == constants.TASK_STATUS_CANCELLING:
             raise exception.TaskIsCancelling(task_id=task_id)
@@ -1716,15 +1733,44 @@ class ConductorServerEndpoint(object):
                 "expected '%s' required for it to have a task host set." % (
                     task_id, task.status, constants.TASK_STATUS_PENDING))
         LOG.info(
-            "Setting host/process for task with ID '%s' to '%s/%s'",
-            task_id, host, process_id)
-        db_api.set_task_host(ctxt, task_id, host, process_id)
+            "Setting host for task with ID '%s' to '%s'", task_id, host)
+        db_api.set_task_host_properties(ctxt, task_id, host=host)
         db_api.set_task_status(
             ctxt, task_id, new_status,
             exception_details=exception_details)
         LOG.info(
-            "Successfully set host/process for task with ID '%s' to '%s/%s'",
-            task_id, host, process_id)
+            "Successfully set host for task with ID '%s' to '%s'",
+            task_id, host)
+
+    @parent_tasks_execution_synchronized
+    def set_task_process(self, ctxt, task_id, process_id):
+        """ Sets the ID of the Worker-side process for the given task,
+        and marks the task as actually 'RUNNING'. """
+        task = db_api.get_task(ctxt, task_id)
+        if not task.host:
+            raise exception.InvalidTaskState(
+                "Task with ID '%s' (current status '%s') has no host set "
+                "for it. Cannot set host process." % (
+                    task_id, task.status))
+        acceptable_statuses = [
+            constants.TASK_STATUS_STARTING,
+            constants.TASK_STATUS_CANCELLING_AFTER_COMPLETION]
+        if task.status not in acceptable_statuses:
+            raise exception.InvalidTaskState(
+                "Task with ID '%s' is in '%s' status instead of the "
+                "expected statuses (%s) required for it to have a task "
+                "process set." % (
+                    task_id, task.status, acceptable_statuses))
+
+        LOG.info(
+            "Setting process '%s' (host %s) for task '%s' and transitioning "
+            "it from status '%s' to '%s'", process_id, task.host, task_id,
+            task.status, constants.TASK_STATUS_RUNNING)
+        db_api.set_task_host_properties(ctxt, task_id, process_id=process_id)
+        db_api.set_task_status(ctxt, task_id, constants.TASK_STATUS_RUNNING)
+        LOG.info(
+            "Successfully set task process for task with ID '%s' to '%s'",
+            task_id, process_id)
 
     def _check_clean_execution_deadlock(
             self, ctxt, execution, task_statuses=None, requery=True):
@@ -1776,7 +1822,7 @@ class ConductorServerEndpoint(object):
 
     def _get_execution_status(self, ctxt, execution, requery=False):
         """ Returns the global status of an execution.
-        RUNNING - at least one task is RUNNING, PENDING or CANCELLING
+        RUNNING - at least one task is RUNNING, STARTING, PENDING or CANCELLING
         COMPLETED - all non-error-only tasks are COMPLETED
         CANCELED - no more RUNNING/PENDING/SCHEDULED tasks but some CANCELED
         CANCELIING - at least one task in CANCELLING status
@@ -1797,7 +1843,9 @@ class ConductorServerEndpoint(object):
                 is_running = True
             if task.status in constants.CANCELED_TASK_STATUSES:
                 is_canceled = True
-            if task.status == constants.TASK_STATUS_ERROR:
+            if task.status in (
+                    constants.TASK_STATUS_ERROR,
+                    constants.TASK_STATUS_FAILED_TO_SCHEDULE):
                 is_errord = True
             if task.status in (
                     constants.TASK_STATUS_CANCELLING,
@@ -1925,17 +1973,16 @@ class ConductorServerEndpoint(object):
                     "Successfully started task with ID '%s' (type '%s') "
                     "for execution '%s'", task.id, task.task_type,
                     execution.id)
+                started_tasks.append(task.id)
+                return constants.TASK_STATUS_PENDING
             except Exception as ex:
-                msg = (
+                LOG.warn(
                     "Error occured while starting new task '%s'. "
-                    "Cancelling execution '%s'. Error was: %s" % (
-                        task.id, execution.id,
-                        utils.get_exception_details()))
+                    "Cancelling execution '%s'. Error was: %s",
+                    task.id, execution.id, utils.get_exception_details())
                 self._cancel_tasks_execution(
                     ctxt, execution, requery=True)
-                raise exception.CoriolisException(msg) from ex
-
-            started_tasks.append(task.id)
+                raise
 
         # aggregate all tasks and statuses:
         task_statuses = {}
@@ -1955,7 +2002,7 @@ class ConductorServerEndpoint(object):
         LOG.debug(
             "All task statuses before execution '%s' lifecycle iteration "
             "(for tasks of instance '%s'): %s",
-            instance, execution.id, task_statuses)
+            execution.id, instance, task_statuses)
 
         # NOTE: the tasks are saved in a random order in the DB, which
         # complicates the processing logic so we just pre-sort:
@@ -1967,8 +2014,7 @@ class ConductorServerEndpoint(object):
                 if not task_deps[task.id]:
                     LOG.info(
                         "Starting depency-less task '%s'", task.id)
-                    _start_task(task)
-                    task_statuses[task.id] = constants.TASK_STATUS_PENDING
+                    task_statuses[task.id] = _start_task(task)
                     continue
 
                 parent_task_statuses = {
@@ -2009,9 +2055,7 @@ class ConductorServerEndpoint(object):
                                 "Starting task '%s' as all dependencies have "
                                 "completed successfully: %s",
                                 task.id, parent_task_statuses)
-                            _start_task(task)
-                            task_statuses[task.id] = (
-                                constants.TASK_STATUS_PENDING)
+                            task_statuses[task.id] = _start_task(task)
                         else:
                             # it means one/more parents error'd/unscheduled
                             # so we mark this task as unscheduled:
@@ -2045,9 +2089,7 @@ class ConductorServerEndpoint(object):
                                 "non-error parent (%s) was completed: %s",
                                 task.id, list(non_error_parents.keys()),
                                 parent_task_statuses)
-                            _start_task(task)
-                            task_statuses[task.id] = (
-                                constants.TASK_STATUS_PENDING)
+                            task_statuses[task.id] = _start_task(task)
                         else:
                             LOG.info(
                                 "Unscheduling on-error task '%s' as none of "
@@ -2340,7 +2382,7 @@ class ConductorServerEndpoint(object):
                 ctxt, task, execution, updated_task_info)
 
             newly_started_tasks = self._advance_execution_state(
-                ctxt, execution, instance=task.instance)
+                ctxt, execution, instance=task.instance, requery=False)
             if newly_started_tasks:
                 LOG.info(
                     "The following tasks were started for execution '%s' "

+ 2 - 0
coriolis/constants.py

@@ -26,6 +26,7 @@ FINALIZED_EXECUTION_STATUSES = [
 
 TASK_STATUS_SCHEDULED = "SCHEDULED"
 TASK_STATUS_PENDING = "PENDING"
+TASK_STATUS_STARTING = "STARTING"
 TASK_STATUS_UNSCHEDULED = "UNSCHEDULED"
 TASK_STATUS_RUNNING = "RUNNING"
 TASK_STATUS_COMPLETED = "COMPLETED"
@@ -42,6 +43,7 @@ TASK_STATUS_FAILED_TO_SCHEDULE = "FAILED_TO_SCHEDULE"
 
 ACTIVE_TASK_STATUSES = [
     TASK_STATUS_PENDING,
+    TASK_STATUS_STARTING,
     TASK_STATUS_RUNNING,
     TASK_STATUS_CANCELLING,
     TASK_STATUS_CANCELLING_AFTER_COMPLETION

+ 5 - 3
coriolis/db/api.py

@@ -666,10 +666,12 @@ def set_task_status(context, task_id, status, exception_details=None):
 
 
 @enginefacade.writer
-def set_task_host(context, task_id, host, process_id):
+def set_task_host_properties(context, task_id, host=None, process_id=None):
     task = _get_task(context, task_id)
-    task.host = host
-    task.process_id = process_id
+    if host:
+        task.host = host
+    if process_id:
+        task.process_id = process_id
 
 
 @enginefacade.reader

+ 25 - 7
coriolis/worker/rpc/server.py

@@ -117,6 +117,9 @@ class WorkerServerEndpoint(object):
         return status
 
     def cancel_task(self, ctxt, task_id, process_id, force):
+        LOG.debug(
+            "Received request to cancel task '%s' (process %s)",
+            task_id, process_id)
         if not force and os.name == "nt":
             LOG.warn("Windows does not support SIGINT, performing a "
                      "forced task termination")
@@ -138,8 +141,6 @@ class WorkerServerEndpoint(object):
                 "completed/error'd." % (
                     process_id, task_id))
             LOG.error(msg)
-            self._rpc_conductor_client.confirm_task_cancellation(
-                ctxt, task_id, msg)
 
     def _handle_mp_log_events(self, p, mp_log_q):
         while True:
@@ -225,13 +226,25 @@ class WorkerServerEndpoint(object):
         extra_library_paths = self._get_extra_library_paths_for_providers(
             ctxt, task_id, task_type, origin, destination)
 
-        self._start_process_with_custom_library_paths(p, extra_library_paths)
-        LOG.info("Task process started: %s", task_id)
-        LOG.debug(
-            "Attempting to set task host on Conductor for task '%s'.", task_id)
         try:
+            LOG.debug(
+                "Attempting to set task host on Conductor for task '%s'.",
+                task_id)
             self._rpc_conductor_client.set_task_host(
-                ctxt, task_id, self._server, p.pid)
+                ctxt, task_id, self._server)
+            LOG.debug(
+                "Attempting to start process for task with ID '%s'", task_id)
+            self._start_process_with_custom_library_paths(
+                p, extra_library_paths)
+            LOG.info("Task process started: %s", task_id)
+            LOG.debug(
+                "Attempting to set task process on Conductor for task '%s'.",
+                task_id)
+            self._rpc_conductor_client.set_task_process(
+                ctxt, task_id, p.pid)
+            LOG.debug(
+                "Successfully started and retported task process for task "
+                "with ID '%s'.", task_id)
         except (Exception, KeyboardInterrupt) as ex:
             LOG.debug(
                 "Exception occurred whilst setting host for task '%s'. Error "
@@ -288,6 +301,11 @@ class WorkerServerEndpoint(object):
             LOG.exception(ex)
             self._rpc_conductor_client.confirm_task_cancellation(
                 ctxt, task_id, str(ex))
+        except exception.NoSuitableWorkerServiceError as ex:
+            LOG.warn(
+                "A conductor-side scheduling error has occurred following the "
+                "completion of task '%s'. Ignoring. Error was: %s",
+                task_id, utils.get_exception_details())
         except Exception as ex:
             LOG.debug(
                 "Task with ID '%s' has error'd out. Reporting error to "