Explorar o código

Task error handling improvements

Alessandro Pilotti %!s(int64=9) %!d(string=hai) anos
pai
achega
5f90d8b7e9
Modificáronse 3 ficheiros con 46 adicións e 27 borrados
  1. 44 26
      coriolis/conductor/rpc/server.py
  2. 1 0
      coriolis/constants.py
  3. 1 1
      coriolis/tasks/replica_tasks.py

+ 44 - 26
coriolis/conductor/rpc/server.py

@@ -70,15 +70,21 @@ class ConductorServerEndpoint(object):
         task.id = str(uuid.uuid4())
         task.instance = instance
         task.execution = execution
-        task.status = constants.TASK_STATUS_PENDING
         task.task_type = task_type
         task.depends_on = depends_on
         task.on_error = on_error
+
+        if not depends_on and on_error:
+            task.status = constants.TASK_STATUS_ON_ERROR_ONLY
+        else:
+            task.status = constants.TASK_STATUS_PENDING
+
         return task
 
     def _begin_tasks(self, ctxt, execution, task_info={}):
         for task in execution.tasks:
-            if not task.depends_on and not task.on_error:
+            if (not task.depends_on and
+                    task.status == constants.TASK_STATUS_PENDING):
                 self._rpc_worker_client.begin_task(
                     ctxt, server=None,
                     task_id=task.id,
@@ -384,7 +390,8 @@ class ConductorServerEndpoint(object):
                 self._rpc_worker_client.cancel_task(
                     ctxt, task.host, task.process_id)
                 task_canceled = True
-            elif task.status == constants.TASK_STATUS_PENDING:
+            elif task.status in [constants.TASK_STATUS_PENDING,
+                                 constants.TASK_STATUS_ON_ERROR_ONLY]:
                 if task.on_error:
                     action = db_api.get_action(ctxt, execution.action_id)
                     task_info = action.info.get(task.instance, {})
@@ -446,24 +453,38 @@ class ConductorServerEndpoint(object):
                             task_info=task_info)
 
     def _update_replica_volumes_info(self, ctxt, migration_id, instance,
-                                     task_info):
-        # When restoring a snapshot in some import providers (e.g. OpenStack),
-        # a new volume_id is generated. This needs to be updated in the
-        # Replica instance as well.
-        volumes_info = task_info.get("volumes_info")
-        if volumes_info:
-            updated_task_info = {"volumes_info": volumes_info}
-
-            migration = db_api.get_migration(ctxt, migration_id)
-            replica_id = migration.replica_id
-
-            with lockutils.lock(replica_id):
-                LOG.debug(
-                    "Updating volume_info in replica due to snapshot "
-                    "restore during migration. replica id: %s", replica_id)
-                db_api.set_transfer_action_info(
-                    ctxt, replica_id, instance,
-                    updated_task_info)
+                                     updated_task_info):
+        migration = db_api.get_migration(ctxt, migration_id)
+        replica_id = migration.replica_id
+
+        with lockutils.lock(replica_id):
+            LOG.debug(
+                "Updating volume_info in replica due to snapshot "
+                "restore during migration. replica id: %s", replica_id)
+            db_api.set_transfer_action_info(
+                ctxt, replica_id, instance,
+                updated_task_info)
+
+    def _handle_post_task_actions(self, ctxt, task, execution, task_info):
+        task_type = task.task_type
+        updated_task_info = None
+
+        if task_type == constants.TASK_TYPE_RESTORE_REPLICA_DISK_SNAPSHOTS:
+            # When restoring a snapshot in some import providers (OpenStack),
+            # a new volume_id is generated. This needs to be updated in the
+            # Replica instance as well.
+            volumes_info = task_info.get("volumes_info")
+            if volumes_info:
+                updated_task_info = {"volumes_info": volumes_info}
+        elif task_type == constants.TASK_TYPE_DELETE_REPLICA_DISK_SNAPSHOTS:
+            # The migration completed. If the replica is executed again,
+            # new volumes need to be deployed in place of the migrated ones.
+            updated_task_info = {"volumes_info": None}
+
+        if updated_task_info:
+            self._update_replica_volumes_info(
+                ctxt, execution.action_id, task.instance,
+                updated_task_info)
 
     @task_synchronized
     def task_completed(self, ctxt, task_id, task_info):
@@ -483,11 +504,8 @@ class ConductorServerEndpoint(object):
             updated_task_info = db_api.set_transfer_action_info(
                 ctxt, action_id, task.instance, task_info)
 
-            if (task.task_type ==
-                    constants.TASK_TYPE_RESTORE_REPLICA_DISK_SNAPSHOTS):
-                self._update_replica_volumes_info(
-                    ctxt, execution.action_id, task.instance,
-                    updated_task_info)
+            self._handle_post_task_actions(
+                ctxt, task, execution, updated_task_info)
 
             if execution.status == constants.EXECUTION_STATUS_RUNNING:
                 self._start_pending_tasks(

+ 1 - 0
coriolis/constants.py

@@ -10,6 +10,7 @@ TASK_STATUS_RUNNING = "RUNNING"
 TASK_STATUS_COMPLETED = "COMPLETED"
 TASK_STATUS_ERROR = "ERROR"
 TASK_STATUS_CANCELED = "CANCELED"
+TASK_STATUS_ON_ERROR_ONLY = "EXECUTE_ON_ERROR_ONLY"
 
 TASK_TYPE_EXPORT_INSTANCE = "EXPORT_INSTANCE"
 TASK_TYPE_IMPORT_INSTANCE = "IMPORT_INSTANCE"

+ 1 - 1
coriolis/tasks/replica_tasks.py

@@ -106,7 +106,7 @@ class DeployReplicaDisksTask(base.TaskRunner):
             destination["type"], constants.PROVIDER_TYPE_IMPORT, event_handler)
         connection_info = base.get_connection_info(ctxt, destination)
 
-        volumes_info = task_info.get("volumes_info", [])
+        volumes_info = task_info.get("volumes_info") or []
 
         volumes_info = provider.deploy_replica_disks(
             ctxt, connection_info, target_environment, instance, export_info,