Răsfoiți Sursa

Adds on_error tasks execution

Alessandro Pilotti 9 ani în urmă
părinte
comite
3555edc005

+ 63 - 37
coriolis/conductor/rpc/server.py

@@ -64,7 +64,8 @@ class ConductorServerEndpoint(object):
         self._rpc_worker_client = rpc_worker_client.WorkerClient()
 
     @staticmethod
-    def _create_task(instance, task_type, execution, depends_on=None):
+    def _create_task(instance, task_type, execution, depends_on=None,
+                     on_error=False):
         task = models.Task()
         task.id = str(uuid.uuid4())
         task.instance = instance
@@ -72,11 +73,12 @@ class ConductorServerEndpoint(object):
         task.status = constants.TASK_STATUS_PENDING
         task.task_type = task_type
         task.depends_on = depends_on
+        task.on_error = on_error
         return task
 
     def _begin_tasks(self, ctxt, execution, task_info={}):
         for task in execution.tasks:
-            if not task.depends_on:
+            if not task.depends_on and not task.on_error:
                 self._rpc_worker_client.begin_task(
                     ctxt, server=None,
                     task_id=task.id,
@@ -111,12 +113,12 @@ class ConductorServerEndpoint(object):
                     instance, constants.TASK_TYPE_DEPLOY_REPLICA_DISKS,
                     execution, depends_on=[get_instance_info_task.id])
 
-                deploy_replica_export_resources_task = self._create_task(
+                deploy_replica_source_resources_task = self._create_task(
                     instance,
                     constants.TASK_TYPE_DEPLOY_REPLICA_SOURCE_RESOURCES,
                     execution, depends_on=[deploy_replica_disks_task.id])
 
-                deploy_replica_resources_task = self._create_task(
+                deploy_replica_target_resources_task = self._create_task(
                     instance,
                     constants.TASK_TYPE_DEPLOY_REPLICA_TARGET_RESOURCES,
                     execution, depends_on=[deploy_replica_disks_task.id])
@@ -124,18 +126,20 @@ class ConductorServerEndpoint(object):
                 replicate_disks_task = self._create_task(
                     instance, constants.TASK_TYPE_REPLICATE_DISKS,
                     execution, depends_on=[
-                        deploy_replica_export_resources_task.id,
-                        deploy_replica_resources_task.id])
+                        deploy_replica_source_resources_task.id,
+                        deploy_replica_target_resources_task.id])
 
                 self._create_task(
                     instance,
                     constants.TASK_TYPE_DELETE_REPLICA_SOURCE_RESOURCES,
-                    execution, depends_on=[replicate_disks_task.id])
+                    execution, depends_on=[replicate_disks_task.id],
+                    on_error=True)
 
                 self._create_task(
                     instance,
                     constants.TASK_TYPE_DELETE_REPLICA_TARGET_RESOURCES,
-                    execution, depends_on=[replicate_disks_task.id])
+                    execution, depends_on=[replicate_disks_task.id],
+                    on_error=True)
 
         db_api.add_replica_tasks_execution(ctxt, execution)
         LOG.info("Replica tasks execution created: %s", execution.id)
@@ -368,17 +372,38 @@ class ConductorServerEndpoint(object):
         self._cancel_tasks_execution(ctxt, execution)
 
     def _cancel_tasks_execution(self, ctxt, execution):
+        has_running_tasks = False
         for task in execution.tasks:
-            if task.status in [constants.TASK_STATUS_PENDING,
-                               constants.TASK_STATUS_RUNNING]:
-                if task.status == constants.TASK_STATUS_RUNNING:
-                    self._rpc_worker_client.cancel_task(
-                        ctxt, task.host, task.process_id)
+            task_canceled = False
+            if task.status == constants.TASK_STATUS_RUNNING:
+                self._rpc_worker_client.cancel_task(
+                    ctxt, task.host, task.process_id)
+                task_canceled = True
+            elif task.status == constants.TASK_STATUS_PENDING:
+                if task.on_error:
+                    action = db_api.get_action(ctxt, execution.action_id)
+                    task_info = action.info.get(task.instance, {})
+
+                    self._rpc_worker_client.begin_task(
+                        ctxt, server=None,
+                        task_id=task.id,
+                        task_type=task.task_type,
+                        origin=action.origin,
+                        destination=action.destination,
+                        instance=task.instance,
+                        task_info=task_info)
+
+                    has_running_tasks = True
+                else:
+                    task_canceled = True
+
+            if task_canceled:
                 db_api.set_task_status(
                     ctxt, task.id, constants.TASK_STATUS_CANCELED)
 
-        db_api.set_execution_status(
-            ctxt, execution.id, constants.EXECUTION_STATUS_ERROR)
+        if not has_running_tasks:
+            db_api.set_execution_status(
+                ctxt, execution.id, constants.EXECUTION_STATUS_ERROR)
 
     @task_synchronized
     def set_task_host(self, ctxt, task_id, host, process_id):
@@ -387,10 +412,8 @@ class ConductorServerEndpoint(object):
             ctxt, task_id, constants.TASK_STATUS_RUNNING)
 
     def _start_pending_tasks(self, ctxt, execution, parent_task, task_info):
-        has_pending_tasks = False
         for task in execution.tasks:
             if task.status == constants.TASK_STATUS_PENDING:
-                has_pending_tasks = True
                 if task.depends_on and parent_task.id in task.depends_on:
                     start_task = True
                     for depend_task_id in task.depends_on:
@@ -416,7 +439,6 @@ class ConductorServerEndpoint(object):
                             destination=action.destination,
                             instance=task.instance,
                             task_info=task_info)
-        return has_pending_tasks
 
     @task_synchronized
     def task_completed(self, ctxt, task_id, task_info):
@@ -425,10 +447,9 @@ class ConductorServerEndpoint(object):
         db_api.set_task_status(
             ctxt, task_id, constants.TASK_STATUS_COMPLETED)
 
-        task = db_api.get_task(
-            ctxt, task_id, include_execution_tasks=True)
+        task = db_api.get_task(ctxt, task_id)
+        execution = db_api.get_tasks_execution(ctxt, task.execution_id)
 
-        execution = task.execution
         with lockutils.lock(execution.action_id):
             LOG.info("Setting instance %(instance)s "
                      "action info: %(task_info)s",
@@ -437,14 +458,27 @@ class ConductorServerEndpoint(object):
                 ctxt, execution.action_id, task.instance, task_info)
 
             if execution.status == constants.EXECUTION_STATUS_RUNNING:
-                has_pending_tasks = self._start_pending_tasks(
+                self._start_pending_tasks(
                     ctxt, execution, task, updated_task_info)
 
-                if not has_pending_tasks:
-                    LOG.info("Tasks execution completed: %s", execution.id)
+                if not [t for t in execution.tasks
+                        if t.status in [constants.TASK_STATUS_RUNNING,
+                                        constants.TASK_STATUS_PENDING]]:
+                    # The execution is in error status if there's one or more
+                    # tasks in error or canceled status
+                    if [t for t in execution.tasks
+                            if t.status in [constants.TASK_STATUS_ERROR,
+                                            constants.TASK_STATUS_CANCELED]]:
+                        execution_status = constants.EXECUTION_STATUS_ERROR
+                    else:
+                        execution_status = constants.EXECUTION_STATUS_COMPLETED
+
+                    LOG.info("Tasks execution %(execution_id)s completed "
+                             "with status: %(status)s",
+                             {"execution_id": execution.id,
+                              "status": execution_status})
                     db_api.set_execution_status(
-                        ctxt, execution.id,
-                        constants.EXECUTION_STATUS_COMPLETED)
+                        ctxt, execution.id, execution_status)
 
     @task_synchronized
     def set_task_error(self, ctxt, task_id, exception_details):
@@ -454,19 +488,11 @@ class ConductorServerEndpoint(object):
         db_api.set_task_status(
             ctxt, task_id, constants.TASK_STATUS_ERROR, exception_details)
 
-        task = db_api.get_task(
-            ctxt, task_id, include_execution_tasks=True)
-        execution = task.execution
+        task = db_api.get_task(ctxt, task_id)
+        execution = db_api.get_tasks_execution(ctxt, task.execution_id)
 
         with lockutils.lock(execution.action_id):
-            for task in execution.tasks:
-                if task.status == constants.TASK_STATUS_PENDING:
-                    db_api.set_task_status(
-                        ctxt, task.id, constants.TASK_STATUS_CANCELED)
-
-            LOG.error("Tasks execution failed: %s", execution.id)
-            db_api.set_execution_status(
-                ctxt, execution.id, constants.EXECUTION_STATUS_ERROR)
+            self._cancel_tasks_execution(ctxt, execution)
 
     @task_synchronized
     def task_event(self, ctxt, task_id, level, message):

+ 19 - 8
coriolis/db/api.py

@@ -211,8 +211,8 @@ def set_execution_status(context, execution_id, status):
     execution.status = status
 
 
-@enginefacade.writer
-def set_transfer_action_info(context, action_id, instance, instance_info):
+@enginefacade.reader
+def get_action(context, action_id):
     action = _soft_delete_aware_query(
         context, models.BaseTransferAction).filter(
             models.BaseTransferAction.project_id == context.tenant,
@@ -220,6 +220,12 @@ def set_transfer_action_info(context, action_id, instance, instance_info):
     if not action:
         raise exception.NotFound(
             "Transfer action not found: %s" % action_id)
+    return action
+
+
+@enginefacade.writer
+def set_transfer_action_info(context, action_id, instance, instance_info):
+    action = get_action(context, action_id)
 
     # Copy is needed, otherwise sqlalchemy won't save the changes
     action_info = action.info.copy()
@@ -237,7 +243,16 @@ def set_transfer_action_info(context, action_id, instance, instance_info):
 @enginefacade.reader
 def get_tasks_execution(context, execution_id):
     q = _soft_delete_aware_query(context, models.TasksExecution)
-    return q.filter_by(project_id=context.tenant, id=execution_id).first()
+    q = q.join(models.BaseTransferAction)
+    q = q.options(orm.joinedload("action"))
+    q = q.options(orm.joinedload("tasks"))
+    execution = q.filter(
+        models.BaseTransferAction.project_id == context.tenant,
+        models.TasksExecution.id == execution_id).first()
+    if not execution:
+        raise exception.NotFound(
+            "Tasks execution not found: %s" % execution_id)
+    return execution
 
 
 def _get_task(context, task_id):
@@ -263,12 +278,8 @@ def set_task_host(context, task_id, host, process_id):
 
 
 @enginefacade.reader
-def get_task(context, task_id, include_execution_tasks=False):
+def get_task(context, task_id):
     q = _soft_delete_aware_query(context, models.Task)
-    if include_execution_tasks:
-        q = q.options(
-            orm.joinedload("execution").joinedload("action")).options(
-                orm.joinedload("execution").joinedload("tasks"))
     return q.filter_by(id=task_id).first()
 
 

+ 1 - 0
coriolis/db/sqlalchemy/migrate_repo/versions/001_initial.py

@@ -61,6 +61,7 @@ def upgrade(migrate_engine):
                           nullable=False),
         sqlalchemy.Column("exception_details", sqlalchemy.Text, nullable=True),
         sqlalchemy.Column("depends_on", sqlalchemy.Text, nullable=True),
+        sqlalchemy.Column("on_error", sqlalchemy.Boolean, nullable=True),
         mysql_engine='InnoDB',
         mysql_charset='utf8'
     )

+ 1 - 0
coriolis/db/sqlalchemy/models.py

@@ -59,6 +59,7 @@ class Task(BASE, models.TimestampMixin, models.SoftDeleteMixin,
     task_type = sqlalchemy.Column(sqlalchemy.String(100), nullable=False)
     exception_details = sqlalchemy.Column(sqlalchemy.Text, nullable=True)
     depends_on = sqlalchemy.Column(types.List, nullable=True)
+    on_error = sqlalchemy.Column(sqlalchemy.Boolean, nullable=False)
     # TODO: Add soft delete filter
     events = orm.relationship(TaskEvent, cascade="all,delete",
                               backref=orm.backref('task'))

+ 8 - 6
coriolis/tasks/replica_tasks.py

@@ -160,10 +160,11 @@ class DeleteReplicaSourceResourcesTask(base.TaskRunner):
             origin["type"], constants.PROVIDER_TYPE_EXPORT, event_handler)
         connection_info = base.get_connection_info(ctxt, origin)
 
-        migr_resources = task_info["migr_source_resources"]
+        migr_resources = task_info.get("migr_source_resources")
 
-        provider.delete_replica_source_resources(
-            ctxt, connection_info, migr_resources)
+        if migr_resources:
+            provider.delete_replica_source_resources(
+                ctxt, connection_info, migr_resources)
 
         task_info["migr_source_resources"] = None
         task_info["migr_source_connection_info"] = None
@@ -203,10 +204,11 @@ class DeleteReplicaTargetResourcesTask(base.TaskRunner):
             destination["type"], constants.PROVIDER_TYPE_IMPORT, event_handler)
         connection_info = base.get_connection_info(ctxt, destination)
 
-        migr_resources = task_info["migr_target_resources"]
+        migr_resources = task_info.get("migr_target_resources")
 
-        provider.delete_replica_target_resources(
-            ctxt, connection_info, migr_resources)
+        if migr_resources:
+            provider.delete_replica_target_resources(
+                ctxt, connection_info, migr_resources)
 
         task_info["migr_target_resources"] = None
         task_info["migr_target_connection_info"] = None