Переглянути джерело

Fix deadlock, convert info field

  * There was a deadlock in worker.rpc.server caused by a large
enough task_info, due to the max buffer size of named pipes on linux.
We were calling join() on the process before consuming the Queue().
The process would never finish, because it would hang on queue.put()
  * Migrated the info field to LONGBLOB due to the new size of
task_info. We are now saving the disk state from previous runs where
the replicator is involved
Gabriel Adrian Samfira 7 роки тому
батько
коміт
429fe195f0

+ 13 - 0
coriolis/db/sqlalchemy/migrate_repo/versions/009_Migrate_info_to_blob.py

@@ -0,0 +1,13 @@
+import sqlalchemy
+from sqlalchemy import types
+
+
+def upgrade(migrate_engine):
+    meta = sqlalchemy.MetaData()
+    meta.bind = migrate_engine
+
+    base_transfer_action = sqlalchemy.Table(
+        'base_transfer_action', meta, autoload=True)
+
+    base_transfer_action.c.info.alter(type=types.Binary(4294967295))
+

+ 1 - 1
coriolis/db/sqlalchemy/models.py

@@ -103,7 +103,7 @@ class BaseTransferAction(BASE, models.TimestampMixin, models.ModelBase,
                                   "base_id==TasksExecution.action_id, "
                                   "TasksExecution.deleted=='0')")
     instances = sqlalchemy.Column(types.List, nullable=False)
-    info = sqlalchemy.Column(types.Json, nullable=False)
+    info = sqlalchemy.Column(types.Bson, nullable=False)
     notes = sqlalchemy.Column(sqlalchemy.Text, nullable=True)
     origin_endpoint_id = sqlalchemy.Column(
         sqlalchemy.String(36),

+ 21 - 0
coriolis/db/sqlalchemy/types.py

@@ -30,6 +30,16 @@ class LongText(types.TypeDecorator):
             return self.impl
 
 
+class Blob(types.TypeDecorator):
+    impl = types.Binary
+
+    def load_dialect_impl(self, dialect):
+        if dialect.name == 'mysql':
+            return dialect.type_descriptor(mysql.BLOB(4294967295))
+        else:
+            return self.impl
+
+
 class Json(LongText):
 
     def process_bind_param(self, value, dialect):
@@ -41,6 +51,17 @@ class Json(LongText):
         return jsonutils.loads(value)
 
 
+class Bson(Blob):
+
+    def process_bind_param(self, value, dialect):
+        return jsonutils.dumps(value).encode('utf-8')
+
+    def process_result_value(self, value, dialect):
+        if value is None:
+            return None
+        return jsonutils.loads(value.decode('utf-8'))
+
+
 class List(types.TypeDecorator):
     impl = types.Text
 

+ 5 - 22
coriolis/providers/replicator.py

@@ -54,14 +54,6 @@ class Client(object):
         self._port_via_tunnel = None
         self._test_connection()
 
-    def __del__(self):
-        if self._tunnel is not None:
-            try:
-                self._tunnel.stop()
-            except BaseException as err:
-                LOG.warning(
-                    "failed to stop tunnel: %s" % err)
-
     @property
     def repl_host(self):
         if self._ip_via_tunnel is not None:
@@ -287,7 +279,7 @@ class Replicator(object):
                 if perc_step is None:
                     perc_step = self._event_manager.add_percentage_step(
                         100,
-                        message_format=("Disk %s chunk processing progress: "
+                        message_format=("Chunking progress for disk %s: "
                                         "{:.0f}%%") % devName)
                     perc_steps[devName] = perc_step
                 perc_done = vol["checksum-status"]["percentage"]
@@ -517,20 +509,11 @@ class Replicator(object):
             "ca_cert": caCert,
         }
 
-    def _update_state(self, volumes_info):
-        """
-        should be called from replicate_disks every time a disk
-        is successfully synced. Coriolis does not yet send partial
-        updates with migration info, so I suppose if we call it once
-        on replica success, it should be enough.
-        """
-        pass
-
     def _get_size_from_chunks(self, chunks):
         ret = 0
         for chunk in chunks:
             ret += chunk["length"]
-        return ret / units.Gi
+        return ret / units.Mi
 
     def _find_vol_state(self, name, state):
         for vol in state:
@@ -591,7 +574,7 @@ class Replicator(object):
 
             size = self._get_size_from_chunks(chunks)
 
-            msg = ("Replicating disk %s (%s GB):"
+            msg = ("Disk replication progress for %s (%.3f MB):"
                    " {:.0f}%%") % (volume["disk_path"], size)
             perc_step = self._event_manager.add_percentage_step(
                 len(chunks), message_format=msg)
@@ -658,8 +641,8 @@ class Replicator(object):
             # create sparse file
             fp.truncate(size)
             perc_step = self._event_manager.add_percentage_step(
-                len(chunks), message_format="Downloading disk /dev/%s (%s GB):"
-                " {:.0f}%%" % (disk, size_from_chunks))
+                len(chunks), message_format="Disk download progress for "
+                "/dev/%s (%s GB): {:.0f}%%" % (disk, size_from_chunks))
             for chunk in chunks:
                 offset = int(chunk["offset"])
                 # seek to offset

+ 0 - 1
coriolis/secrets.py

@@ -9,7 +9,6 @@ from coriolis import keystone
 
 
 def get_secret(ctxt, secret_ref):
-    keystone.create_trust(ctxt)
     session = keystone.create_keystone_session(ctxt)
     barbican = barbican_client.Client(session=session)
     return json.loads(barbican.secrets.get(secret_ref).payload)

+ 26 - 5
coriolis/worker/rpc/server.py

@@ -2,10 +2,13 @@
 # All Rights Reserved.
 
 import multiprocessing
+
 import os
 import shutil
+import time
 import signal
 import sys
+import eventlet
 
 from logging import handlers
 from oslo_config import cfg
@@ -149,6 +152,24 @@ class WorkerServerEndpoint(object):
         return task_runner.get_shared_libs_for_providers(
             ctxt, origin, destination, event_handler)
 
+    def _wait_for_process(self, p, mp_q):
+        result = None
+        while True:
+            if not result:
+                try:
+                    result = mp_q.get(timeout=1)
+                except queue.Empty:
+                    if not p.is_alive():
+                        break
+            if not p.is_alive():
+                if not result:
+                    try:
+                        result = mp_q.get(False)
+                    except:
+                        pass
+                break
+        return result
+
     def _exec_task_process(self, ctxt, task_id, task_type, origin, destination,
                            instance, task_info):
         mp_ctx = multiprocessing.get_context('spawn')
@@ -167,12 +188,14 @@ class WorkerServerEndpoint(object):
         self._rpc_conductor_client.set_task_host(
             ctxt, task_id, self._server, p.pid)
 
-        self._handle_mp_log_events(p, mp_log_q)
+        evt = eventlet.spawn(self._wait_for_process, p, mp_q)
+        eventlet.spawn(self._handle_mp_log_events, p, mp_log_q)
+
+        result = evt.wait()
         p.join()
 
-        if mp_q.empty():
+        if not result:
             raise exception.CoriolisException("Task canceled")
-        result = mp_q.get(False)
 
         if isinstance(result, str):
             raise exception.TaskProcessException(result)
@@ -414,10 +437,8 @@ def _task_process(ctxt, task_id, task_type, origin, destination, instance,
 
         new_task_info = task_runner.run(
             ctxt, instance, origin, destination, task_info, event_handler)
-
         # mq_p.put() doesn't raise if new_task_info is not serializable
         utils.is_serializable(new_task_info)
-
         mp_q.put(new_task_info)
     except Exception as ex:
         mp_q.put(str(ex))