Просмотр исходного кода

Cancels tasks with signal for proper cleanup

Alessandro Pilotti 9 лет назад
Родитель
Сommit
b2e52e17a5

+ 22 - 22
coriolis/conductor/rpc/server.py

@@ -406,34 +406,34 @@ class ConductorServerEndpoint(object):
     def _cancel_tasks_execution(self, ctxt, execution):
         has_running_tasks = False
         for task in execution.tasks:
-            task_canceled = False
             if task.status == constants.TASK_STATUS_RUNNING:
                 self._rpc_worker_client.cancel_task(
                     ctxt, task.host, task.process_id)
-                task_canceled = True
-            elif task.status in [constants.TASK_STATUS_PENDING,
-                                 constants.TASK_STATUS_ON_ERROR_ONLY]:
-                if task.on_error:
-                    action = db_api.get_action(ctxt, execution.action_id)
-                    task_info = action.info.get(task.instance, {})
-
-                    self._rpc_worker_client.begin_task(
-                        ctxt, server=None,
-                        task_id=task.id,
-                        task_type=task.task_type,
-                        origin=action.origin,
-                        destination=action.destination,
-                        instance=task.instance,
-                        task_info=task_info)
-
-                    has_running_tasks = True
-                else:
-                    task_canceled = True
-
-            if task_canceled:
+                has_running_tasks = True
+            elif (task.status == constants.TASK_STATUS_PENDING and
+                    not task.on_error):
                 db_api.set_task_status(
                     ctxt, task.id, constants.TASK_STATUS_CANCELED)
 
+        if not has_running_tasks:
+            for task in execution.tasks:
+                if task.status in [constants.TASK_STATUS_PENDING,
+                                   constants.TASK_STATUS_ON_ERROR_ONLY]:
+                    if task.on_error:
+                        action = db_api.get_action(ctxt, execution.action_id)
+                        task_info = action.info.get(task.instance, {})
+
+                        self._rpc_worker_client.begin_task(
+                            ctxt, server=None,
+                            task_id=task.id,
+                            task_type=task.task_type,
+                            origin=action.origin,
+                            destination=action.destination,
+                            instance=task.instance,
+                            task_info=task_info)
+
+                        has_running_tasks = True
+
         if not has_running_tasks:
             db_api.set_execution_status(
                 ctxt, execution.id, constants.EXECUTION_STATUS_ERROR)

+ 1 - 1
coriolis/providers/openstack/__init__.py

@@ -758,7 +758,7 @@ class ImportProvider(base.BaseReplicaImportProvider):
             self._create_target_instance(
                 nova, config.flavor_name, instance_name,
                 config.keypair_name, ports, volumes)
-        except Exception:
+        except:
             if not volumes_info:
                 # Don't remove replica volumes
                 self._event_manager.progress_update("Deleting volumes")

+ 14 - 0
coriolis/providers/vmware_vsphere/__init__.py

@@ -100,6 +100,20 @@ class _SSHBackupWriter(_BaseBackupWriter):
         self._pkey = pkey
         self._password = password
         self._volumes_info = volumes_info
+        self._ssh = None
+
+    @contextlib.contextmanager
+    def open(self, path, disk_id):
+        self._path = path
+        self._disk_id = disk_id
+        self._open()
+        try:
+            yield self
+            # Don't send a message via ssh on exception
+            self.close()
+        except:
+            self._ssh.close()
+            raise
 
     @utils.retry_on_error()
     def _connect_ssh(self):

+ 10 - 0
coriolis/providers/vmware_vsphere/vixdisklib.py

@@ -5,6 +5,10 @@ import contextlib
 import ctypes
 import os
 
+from oslo_log import log as logging
+
+LOG = logging.getLogger(__name__)
+
 if os.name == 'nt':
     vixDiskLibName = 'vixDiskLib.dll'
 else:
@@ -144,6 +148,8 @@ def get_transport_modes():
 @contextlib.contextmanager
 def connect(server_name, thumbprint, username, password, vmx_spec=None,
             snapshot_ref=None, read_only=True, transport_modes=None, port=443):
+    LOG.debug("Connecting VixDiskLib: %s", server_name)
+
     connectParams = VixDiskLibConnectParams()
 
     connectParams.serverName = server_name.encode()
@@ -175,6 +181,8 @@ def connect(server_name, thumbprint, username, password, vmx_spec=None,
 
 @contextlib.contextmanager
 def open(conn, disk_path, flags=VIXDISKLIB_FLAG_OPEN_READ_ONLY):
+    LOG.debug("Openning VixDiskLib disk: %s", disk_path)
+
     disk_handle = ctypes.c_void_p()
     _check_err(vixDiskLib.VixDiskLib_Open(
         conn, disk_path.encode(), flags, ctypes.byref(disk_handle)))
@@ -223,10 +231,12 @@ def read(disk_handle, start_sector, num_sectors, buf):
 
 
 def close(disk_handle):
+    LOG.debug("Closing VixDiskLib disk handle: %s", disk_handle)
     _check_err(vixDiskLib.VixDiskLib_Close(disk_handle))
 
 
 def disconnect(conn):
+    LOG.debug("Disconnecting VixDiskLib")
     _check_err(vixDiskLib.VixDiskLib_Disconnect(conn))
 
 

+ 4 - 0
coriolis/utils.py

@@ -57,6 +57,10 @@ def retry_on_error(max_attempts=5, sleep_seconds=0,
             while True:
                 try:
                     return func(*args, **kwargs)
+                except KeyboardInterrupt as ex:
+                    LOG.debug("Got a KeyboardInterrupt, skip retrying")
+                    LOG.exception(ex)
+                    raise
                 except Exception as ex:
                     if any([isinstance(ex, tex)
                             for tex in terminal_exceptions]):

+ 9 - 1
coriolis/worker/rpc/server.py

@@ -6,6 +6,7 @@ import multiprocessing
 import os
 import queue
 import shutil
+import signal
 import sys
 
 import psutil
@@ -77,7 +78,14 @@ class WorkerServerEndpoint(object):
     def cancel_task(self, ctxt, process_id):
         try:
             p = psutil.Process(process_id)
-            p.kill()
+            if os.name != "nt":
+                LOG.info("Sending SIGINT to process: %s", process_id)
+                p.send_signal(signal.SIGINT)
+            else:
+                LOG.warn(
+                    "Windows does not support SIGINT, killing process: %s",
+                    process_id)
+                p.kill()
         except psutil.NoSuchProcess:
             LOG.info("Task process not found: %s", process_id)