Alessandro Pilotti 9 лет назад
Родитель
Сommit
3eeaaf6786

+ 4 - 1
coriolis/api/v1/migration_actions.py

@@ -16,7 +16,10 @@ class MigrationActionsController(api_wsgi.Controller):
     @api_wsgi.action('cancel')
     def _cancel(self, req, id, body):
         try:
-            self._migration_api.cancel(req.environ['coriolis.context'], id)
+            force = (body["cancel"] or {}).get("force", False)
+
+            self._migration_api.cancel(
+                req.environ['coriolis.context'], id, force)
             raise exc.HTTPNoContent()
         except exception.NotFound as ex:
             raise exc.HTTPNotFound(explanation=ex.msg)

+ 3 - 1
coriolis/api/v1/replica_tasks_execution_actions.py

@@ -16,8 +16,10 @@ class ReplicaTasksExecutionActionsController(api_wsgi.Controller):
     @api_wsgi.action('cancel')
     def _cancel(self, req, replica_id, id, body):
         try:
+            force = (body["cancel"] or {}).get("force", False)
+
             self._replica_tasks_execution_api.cancel(
-                req.environ['coriolis.context'], id)
+                req.environ['coriolis.context'], id, force)
             raise exc.HTTPNoContent()
         except exception.NotFound as ex:
             raise exc.HTTPNotFound(explanation=ex.msg)

+ 4 - 4
coriolis/conductor/rpc/client.py

@@ -36,10 +36,10 @@ class ConductorClient(object):
             ctxt, 'delete_replica_tasks_execution',
             execution_id=execution_id)
 
-    def cancel_replica_tasks_execution(self, ctxt, execution_id):
+    def cancel_replica_tasks_execution(self, ctxt, execution_id, force):
         return self._client.call(
             ctxt, 'cancel_replica_tasks_execution',
-            execution_id=execution_id)
+            execution_id=execution_id, force=force)
 
     def create_instances_replica(self, ctxt, origin, destination, instances):
         return self._client.call(
@@ -85,9 +85,9 @@ class ConductorClient(object):
         self._client.call(
             ctxt, 'delete_migration', migration_id=migration_id)
 
-    def cancel_migration(self, ctxt, migration_id):
+    def cancel_migration(self, ctxt, migration_id, force):
         self._client.call(
-            ctxt, 'cancel_migration', migration_id=migration_id)
+            ctxt, 'cancel_migration', migration_id=migration_id, force=force)
 
     def set_task_host(self, ctxt, task_id, host, process_id):
         self._client.call(

+ 6 - 6
coriolis/conductor/rpc/server.py

@@ -174,13 +174,13 @@ class ConductorServerEndpoint(object):
         db_api.delete_replica_tasks_execution(ctxt, execution_id)
 
     @tasks_execution_synchronized
-    def cancel_replica_tasks_execution(self, ctxt, execution_id):
+    def cancel_replica_tasks_execution(self, ctxt, execution_id, force):
         execution = self._get_replica_tasks_execution(
             ctxt, execution_id)
         if execution.status != constants.EXECUTION_STATUS_RUNNING:
             raise exception.InvalidReplicaState(
                 "The replica tasks execution is not running")
-        self._cancel_tasks_execution(ctxt, execution)
+        self._cancel_tasks_execution(ctxt, execution, force)
 
     def _get_replica_tasks_execution(self, ctxt, execution_id):
         execution = db_api.get_replica_tasks_execution(
@@ -394,21 +394,21 @@ class ConductorServerEndpoint(object):
         db_api.delete_migration(ctxt, migration_id)
 
     @migration_synchronized
-    def cancel_migration(self, ctxt, migration_id):
+    def cancel_migration(self, ctxt, migration_id, force):
         migration = self._get_migration(ctxt, migration_id)
         execution = migration.executions[0]
         if execution.status != constants.EXECUTION_STATUS_RUNNING:
             raise exception.InvalidMigrationState(
                 "The migration is not running")
         execution = migration.executions[0]
-        self._cancel_tasks_execution(ctxt, execution)
+        self._cancel_tasks_execution(ctxt, execution, force)
 
-    def _cancel_tasks_execution(self, ctxt, execution):
+    def _cancel_tasks_execution(self, ctxt, execution, force=False):
         has_running_tasks = False
         for task in execution.tasks:
             if task.status == constants.TASK_STATUS_RUNNING:
                 self._rpc_worker_client.cancel_task(
-                    ctxt, task.host, task.process_id)
+                    ctxt, task.host, task.process_id, force)
                 has_running_tasks = True
             elif (task.status == constants.TASK_STATUS_PENDING and
                     not task.on_error):

+ 2 - 2
coriolis/migrations/api.py

@@ -19,8 +19,8 @@ class API(object):
     def delete(self, ctxt, migration_id):
         self._rpc_client.delete_migration(ctxt, migration_id)
 
-    def cancel(self, ctxt, migration_id):
-        self._rpc_client.cancel_migration(ctxt, migration_id)
+    def cancel(self, ctxt, migration_id, force):
+        self._rpc_client.cancel_migration(ctxt, migration_id, force)
 
     def get_migrations(self, ctxt, include_tasks=False):
         return self._rpc_client.get_migrations(ctxt, include_tasks)

+ 2 - 2
coriolis/replica_tasks_executions/api.py

@@ -16,9 +16,9 @@ class API(object):
         self._rpc_client.delete_replica_tasks_execution(
             ctxt, execution_id)
 
-    def cancel(self, ctxt, execution_id):
+    def cancel(self, ctxt, execution_id, force):
         self._rpc_client.cancel_replica_tasks_execution(
-            ctxt, execution_id)
+            ctxt, execution_id, force)
 
     def get_executions(self, ctxt, replica_id, include_tasks=False):
         return self._rpc_client.get_replica_tasks_executions(

+ 2 - 2
coriolis/worker/rpc/client.py

@@ -21,10 +21,10 @@ class WorkerClient(object):
             origin=origin, destination=destination, instance=instance,
             task_info=task_info)
 
-    def cancel_task(self, ctxt, server, process_id):
+    def cancel_task(self, ctxt, server, process_id, force):
         # Needs to be executed on the same server
         cctxt = self._client.prepare(server=server)
-        cctxt.call(ctxt, 'cancel_task', process_id=process_id)
+        cctxt.call(ctxt, 'cancel_task', process_id=process_id, force=force)
 
     def update_migration_status(self, ctxt, task_id, status):
         self._client.call(ctxt, "update_migration_status", status=status)

+ 11 - 7
coriolis/worker/rpc/server.py

@@ -75,17 +75,21 @@ class WorkerServerEndpoint(object):
             # Ignore the exception
             LOG.exception(ex)
 
-    def cancel_task(self, ctxt, process_id):
+    def cancel_task(self, ctxt, process_id, force):
+        if not force and os.name == "nt":
+            LOG.warn("Windows does not support SIGINT, performing a "
+                     "forced task termination")
+            force = True
+
         try:
             p = psutil.Process(process_id)
-            if os.name != "nt":
+
+            if force:
+                LOG.warn("Killing process: %s", process_id)
+                p.kill()
+            else:
                 LOG.info("Sending SIGINT to process: %s", process_id)
                 p.send_signal(signal.SIGINT)
-            else:
-                LOG.warn(
-                    "Windows does not support SIGINT, killing process: %s",
-                    process_id)
-                p.kill()
         except psutil.NoSuchProcess:
             LOG.info("Task process not found: %s", process_id)