Просмотр исходного кода

Merge branch 'master' into test-execution-tasks

Nashwan Azhari 4 лет назад
Родитель
Сommit
70ff4e1cda

+ 5 - 10
coriolis/api/v1/migrations.py

@@ -30,7 +30,7 @@ class MigrationController(api_wsgi.Controller):
 
         return migration_view.single(req, migration)
 
-    def index(self, req):
+    def _list(self, req):
         show_deleted = api_utils._get_show_deleted(
             req.GET.get("show_deleted", None))
         context = req.environ["coriolis.context"]
@@ -40,16 +40,11 @@ class MigrationController(api_wsgi.Controller):
             req, self._migration_api.get_migrations(
                 context, include_tasks=False))
 
+    def index(self, req):
+        return self._list(req)
+
     def detail(self, req):
-        show_deleted = api_utils._get_show_deleted(
-            req.GET.get("show_deleted", None))
-        context = req.environ["coriolis.context"]
-        context.show_deleted = show_deleted
-        context.can(
-            migration_policies.get_migrations_policy_label("show_execution"))
-        return migration_view.collection(
-            req, self._migration_api.get_migrations(
-                context, include_tasks=True))
+        return self._list(req)
 
     @api_utils.format_keyerror_message(resource='migration', method='create')
     def _validate_migration_input(self, context, body):

+ 5 - 10
coriolis/api/v1/replicas.py

@@ -31,7 +31,7 @@ class ReplicaController(api_wsgi.Controller):
 
         return replica_view.single(req, replica)
 
-    def index(self, req):
+    def _list(self, req):
         show_deleted = api_utils._get_show_deleted(
             req.GET.get("show_deleted", None))
         context = req.environ["coriolis.context"]
@@ -41,16 +41,11 @@ class ReplicaController(api_wsgi.Controller):
             req, self._replica_api.get_replicas(
                 context, include_tasks_executions=False))
 
+    def index(self, req):
+        return self._list(req)
+
     def detail(self, req):
-        show_deleted = api_utils._get_show_deleted(
-            req.GET.get("show_deleted", None))
-        context = req.environ["coriolis.context"]
-        context.show_deleted = show_deleted
-        context.can(
-            replica_policies.get_replicas_policy_label("show_executions"))
-        return replica_view.collection(
-            req, self._replica_api.get_replicas(
-                context, include_tasks_executions=True))
+        return self._list(req)
 
     @api_utils.format_keyerror_message(resource='replica', method='create')
     def _validate_create_body(self, context, body):

+ 10 - 0
coriolis/cmd/api.py

@@ -3,17 +3,27 @@
 
 import sys
 
+from oslo_concurrency import processutils
 from oslo_config import cfg
 
 from coriolis import service
 from coriolis import utils
 
+api_opts = [
+    cfg.IntOpt('worker_count',
+               min=1, default=processutils.get_worker_count(),
+               help='Number of processes in which the service will be running')
+]
+
 CONF = cfg.CONF
+CONF.register_opts(api_opts, 'api')
 
 
 def main():
     worker_count, args = service.get_worker_count_from_args(sys.argv)
     CONF(args[1:], project='coriolis', version="1.0.0")
+    if not worker_count:
+        worker_count = CONF.api.worker_count
     utils.setup_logging()
 
     server = service.WSGIService(

+ 10 - 0
coriolis/cmd/conductor.py

@@ -3,6 +3,7 @@
 
 import sys
 
+from oslo_concurrency import processutils
 from oslo_config import cfg
 
 from coriolis import constants
@@ -10,12 +11,21 @@ from coriolis.conductor.rpc import server as rpc_server
 from coriolis import service
 from coriolis import utils
 
+conductor_opts = [
+    cfg.IntOpt('worker_count',
+               min=1, default=processutils.get_worker_count(),
+               help='Number of processes in which the service will be running')
+]
+
 CONF = cfg.CONF
+CONF.register_opts(conductor_opts, 'conductor')
 
 
 def main():
     worker_count, args = service.get_worker_count_from_args(sys.argv)
     CONF(args[1:], project='coriolis', version="1.0.0")
+    if not worker_count:
+        worker_count = CONF.conductor.worker_count
     utils.setup_logging()
     service.check_locks_dir_empty()
 

+ 8 - 1
coriolis/cmd/minion_manager.py

@@ -10,7 +10,14 @@ from coriolis import service
 from coriolis import utils
 from coriolis.minion_manager.rpc import server as rpc_server
 
+minion_manager_opts = [
+    cfg.IntOpt('worker_count',
+               min=1, default=1,
+               help='Number of processes in which the service will be running')
+]
+
 CONF = cfg.CONF
+CONF.register_opts(minion_manager_opts, 'minion_manager')
 
 
 def main():
@@ -21,7 +28,7 @@ def main():
     server = service.MessagingService(
         constants.MINION_MANAGER_MAIN_MESSAGING_TOPIC,
         [rpc_server.MinionManagerServerEndpoint()],
-        rpc_server.VERSION, worker_count=1)
+        rpc_server.VERSION, worker_count=CONF.minion_manager.worker_count)
     launcher = service.service.launch(
         CONF, server, workers=server.get_workers_count())
     launcher.wait()

+ 8 - 1
coriolis/cmd/scheduler.py

@@ -10,7 +10,14 @@ from coriolis import service
 from coriolis import utils
 from coriolis.scheduler.rpc import server as rpc_server
 
+scheduler_opts = [
+    cfg.IntOpt('worker_count',
+               min=1, default=1,
+               help='Number of processes in which the service will be running')
+]
+
 CONF = cfg.CONF
+CONF.register_opts(scheduler_opts, 'scheduler')
 
 
 def main():
@@ -21,7 +28,7 @@ def main():
     server = service.MessagingService(
         constants.SCHEDULER_MAIN_MESSAGING_TOPIC,
         [rpc_server.SchedulerServerEndpoint()],
-        rpc_server.VERSION, worker_count=1)
+        rpc_server.VERSION, worker_count=CONF.scheduler.worker_count)
     launcher = service.service.launch(
         CONF, server, workers=server.get_workers_count())
     launcher.wait()

+ 11 - 1
coriolis/cmd/worker.py

@@ -3,6 +3,7 @@
 
 import sys
 
+from oslo_concurrency import processutils
 from oslo_config import cfg
 
 from coriolis import constants
@@ -10,18 +11,27 @@ from coriolis import service
 from coriolis import utils
 from coriolis.worker.rpc import server as rpc_server
 
+worker_opts = [
+    cfg.IntOpt('worker_count',
+               min=1, default=processutils.get_worker_count(),
+               help='Number of processes in which the service will be running')
+]
+
 CONF = cfg.CONF
+CONF.register_opts(worker_opts, 'worker')
 
 
 def main():
     worker_count, args = service.get_worker_count_from_args(sys.argv)
     CONF(args[1:], project='coriolis', version="1.0.0")
+    if not worker_count:
+        worker_count = CONF.worker.worker_count
     utils.setup_logging()
 
     server = service.MessagingService(
         constants.WORKER_MAIN_MESSAGING_TOPIC,
         [rpc_server.WorkerServerEndpoint()],
-        rpc_server.VERSION, worker_count=worker_count)
+        rpc_server.VERSION, worker_count=worker_count, init_rpc=False)
     launcher = service.service.launch(
         CONF, server, workers=server.get_workers_count())
     launcher.wait()

+ 33 - 5
coriolis/events.py

@@ -3,6 +3,7 @@
 
 import abc
 import collections
+import copy
 
 from oslo_log import log as logging
 from six import with_metaclass
@@ -13,13 +14,14 @@ from coriolis import constants
 LOG = logging.getLogger(__name__)
 
 _PercStepData = collections.namedtuple(
-    "_PercStepData", "progress_update_id last_value total_steps")
+    "_PercStepData", "progress_update_id last_perc last_value total_steps")
 
 
 class EventManager(object, with_metaclass(abc.ABCMeta)):
 
     def __init__(self, event_handler):
         self._event_handler = event_handler
+        self._perc_steps = {}
 
     def _call_event_handler(self, method_name, *args, **kwargs):
         if self._event_handler:
@@ -50,12 +52,38 @@ class EventManager(object, with_metaclass(abc.ABCMeta)):
             self._call_event_handler(
                 'get_progress_update_identifier', progress_update))
 
-        return _PercStepData(progress_update_id, initial_step, total_steps)
+        perc = 0
+        if initial_step > 0 and total_steps > 0:
+            perc = int(initial_step * 100 // total_steps)
+        self._perc_steps[progress_update_id] = _PercStepData(
+                progress_update_id, perc, initial_step, total_steps)
+
+        return self._perc_steps[progress_update_id]
 
     def set_percentage_step(self, step, new_current_step):
-        self._call_event_handler(
-            'update_progress_update', step.progress_update_id,
-            new_current_step)
+        perc_step = self._perc_steps.get(
+                step.progress_update_id, None)
+        if perc_step is None:
+            return
+
+        if perc_step.last_value > new_current_step:
+            LOG.warn("rollback for perc update %s not allowed" % step.progress_update_id)
+            return
+
+        perc = 0
+        if perc_step.total_steps > 0 and new_current_step > 0:
+            perc = int(new_current_step * 100 // perc_step.total_steps)
+
+        if self._call_event_handler and perc > perc_step.last_perc:
+            self._call_event_handler(
+                'update_progress_update', step.progress_update_id,
+                new_current_step)
+            perc_id = copy.copy(step.progress_update_id)
+            total_steps = perc_step.total_steps
+            del self._perc_steps[step.progress_update_id]
+            del perc_step
+            self._perc_steps[perc_id] = _PercStepData(
+                perc_id, perc, 0, total_steps)
 
     def progress_update(self, message):
         self._call_event_handler(

+ 17 - 8
coriolis/providers/backup_writers.py

@@ -3,6 +3,7 @@
 
 import abc
 import contextlib
+import copy
 import datetime
 import errno
 import os
@@ -323,11 +324,12 @@ class SSHBackupWriterImpl(BaseBackupWriterImpl):
             data = self._sender_q.get()
             try:
                 self._send_msg(data)
-            except Exception as err:
+            except BaseException as err:
                 self._exception = err
                 raise
             finally:
                 self._sender_q.task_done()
+                del data
 
     def _encoder(self):
         while True:
@@ -338,7 +340,7 @@ class SSHBackupWriterImpl(BaseBackupWriterImpl):
                     payload["offset"],
                     payload["msg_id"])
                 self._sender_q.put(data)
-            except Exception as err:
+            except BaseException as err:
                 self._exception = err
                 raise
             finally:
@@ -632,9 +634,10 @@ class HTTPBackupWriterImpl(BaseBackupWriterImpl):
                         chunk, constants.COMPRESSION_FORMAT_GZIP)
                     if compressed:
                         send_payload["encoding"] = 'gzip'
-                except Exception as err:
+                except BaseException as err:
                     LOG.exception(err)
                     self._exception = err
+                    self._comp_q.task_done()
                     raise
             send_payload["chunk"] = chunk
             self._sender_q.put(send_payload)
@@ -643,18 +646,20 @@ class HTTPBackupWriterImpl(BaseBackupWriterImpl):
     def _sender(self):
         while True:
             payload = self._sender_q.get()
+            offset = copy.copy(payload["offset"])
             headers = {
-                "X-Write-Offset": str(payload["offset"]),
-                "X-Client-Token": self._id,
+                "X-Write-Offset": str(offset),
+                "X-Client-Token": copy.copy(self._id),
             }
             if payload.get("encoding", None):
-                headers["content-encoding"] = payload["encoding"]
+                enc = copy.copy(payload["encoding"])
+                headers["content-encoding"] = enc
 
             @utils.retry_on_error()
             def send():
                 self._ensure_session()
                 resp = self._session.post(
-                    self._uri, headers=headers, data=payload["chunk"],
+                    self._uri, headers=headers, data=copy.copy(payload["chunk"]),
                     timeout=CONF.default_requests_timeout
                 )
                 LOG.debug(
@@ -671,12 +676,16 @@ class HTTPBackupWriterImpl(BaseBackupWriterImpl):
                     raise
             try:
                 send()
-            except Exception as err:
+            except BaseException as err:
                 # record the exception. We need to terminate
                 # the writer if this is set
                 LOG.exception(err)
                 self._exception = err
+                self._sender_q.task_done()
                 raise
+            finally:
+                del headers
+                del payload
             self._sender_q.task_done()
 
     @utils.retry_on_error()

+ 33 - 26
coriolis/rpc.py

@@ -28,6 +28,7 @@ LOG = logging.getLogger(__name__)
 
 ALLOWED_EXMODS = [
     coriolis.exception.__name__]
+_TRANSPORT = None
 
 
 class RequestContextSerializer(messaging.Serializer):
@@ -65,6 +66,13 @@ def get_server(target, endpoints, serializer=None):
                                     serializer=serializer)
 
 
+def init():
+    global _TRANSPORT
+    if _TRANSPORT is None:
+        _TRANSPORT = _get_transport()
+    return _TRANSPORT
+
+
 class BaseRPCClient(object):
     """ Wrapper for 'oslo_messaging.RPCClient' which automatically
     instantiates and cleans up transports for each call.
@@ -76,43 +84,42 @@ class BaseRPCClient(object):
         if self._timeout is None:
             self._timeout = CONF.default_messaging_timeout
         self._serializer = RequestContextSerializer(serializer)
+        self._transport_conn = None
 
     def __repr__(self):
         return "<RPCClient(target=%s, timeout=%s)>" % (
             self._target, self._timeout)
 
-    @contextlib.contextmanager
-    def _rpc_messaging_client(self):
-        transport = None
-        try:
-            transport = _get_transport()
-            yield messaging.RPCClient(
-                transport, self._target, serializer=self._serializer,
+    @property
+    def _transport(self):
+        global _TRANSPORT
+        if _TRANSPORT is None:
+            if self._transport_conn is None:
+                self._transport_conn = _get_transport()
+            return self._transport_conn
+        else:
+            return _TRANSPORT
+
+    def _rpc_client(self):
+        return messaging.RPCClient(
+                self._transport, self._target,
+                serializer=self._serializer,
                 timeout=self._timeout)
-        finally:
-            if transport:
-                try:
-                    transport.cleanup()
-                except (Exception, KeyboardInterrupt):
-                    LOG.warn(
-                        "Exception occurred while cleaning up transport for "
-                        "RPC client instance '%s'. Error was: %s",
-                        repr(self), utils.get_exception_details())
 
     def _call(self, ctxt, method, **kwargs):
-        with self._rpc_messaging_client() as client:
-            return client.call(ctxt, method, **kwargs)
+        client = self._rpc_client()
+        return client.call(ctxt, method, **kwargs)
 
     def _call_on_host(self, host, ctxt, method, **kwargs):
-        with self._rpc_messaging_client() as client:
-            cctxt = client.prepare(server=host)
-            return cctxt.call(ctxt, method, **kwargs)
+        client = self._rpc_client()
+        cctxt = client.prepare(server=host)
+        return cctxt.call(ctxt, method, **kwargs)
 
     def _cast(self, ctxt, method, **kwargs):
-        with self._rpc_messaging_client() as client:
-            client.cast(ctxt, method, **kwargs)
+        client = self._rpc_client()
+        client.cast(ctxt, method, **kwargs)
 
     def _cast_for_host(self, host, ctxt, method, **kwargs):
-        with self._rpc_messaging_client() as client:
-            cctxt = client.prepare(server=host)
-            cctxt.cast(ctxt, method, **kwargs)
+        client = self._rpc_client()
+        cctxt = client.prepare(server=host)
+        cctxt.cast(ctxt, method, **kwargs)

+ 7 - 3
coriolis/service.py

@@ -51,7 +51,6 @@ def get_worker_count_from_args(argv):
         return count
     parser.add_argument(
         '--worker-process-count', metavar='N', type=_check_positive_worker_count,
-        default=processutils.get_worker_count(),
         help="Number of worker processes for this service. Defaults to the "
              "number of logical CPU cores on the system.")
     args, unknown_args = parser.parse_known_args(args=argv)
@@ -99,7 +98,9 @@ def check_locks_dir_empty():
 
 
 class WSGIService(service.ServiceBase):
-    def __init__(self, name, worker_count=None):
+    def __init__(self, name, worker_count=None, init_rpc=True):
+        if init_rpc:
+            rpc.init()
         self._host = CONF.api_migration_listen
         self._port = CONF.api_migration_listen_port
 
@@ -138,7 +139,10 @@ class WSGIService(service.ServiceBase):
 
 
 class MessagingService(service.ServiceBase):
-    def __init__(self, topic, endpoints, version, worker_count=None):
+    def __init__(self, topic, endpoints, version,
+                 worker_count=None, init_rpc=True):
+        if init_rpc:
+            rpc.init()
         target = messaging.Target(topic=topic,
                                   server=utils.get_hostname(),
                                   version=version)

+ 0 - 0
coriolis/tests/conductor/__init__.py


+ 0 - 0
coriolis/tests/conductor/rpc/__init__.py


+ 8 - 7
coriolis/tests/conductor/rpc/test_server.py

@@ -8,7 +8,7 @@ from unittest import mock
 
 from coriolis.conductor.rpc import server
 from coriolis import constants, exception
-from coriolis.tests import test_base
+from coriolis.tests import test_base, testutils
 
 
 @ddt.ddt
@@ -19,21 +19,22 @@ class ConductorServerEndpointTestCase(test_base.CoriolisBaseTestCase):
         super(ConductorServerEndpointTestCase, self).setUp()
         self.server = server.ConductorServerEndpoint()
 
-    @ddt.data({}, {mock.sentinel.instance: {}})
     @mock.patch.object(server.ConductorServerEndpoint, '_create_task')
     @mock.patch.object(server.ConductorServerEndpoint,
                        '_check_replica_running_executions')
     @mock.patch.object(server.ConductorServerEndpoint, '_get_replica')
-    def test_update_endpoint_not_found(self, replica_info, mock_get_replica,
-                                       mock_check_replica_running,
-                                       mock_create_task):
+    def test_delete_replica_disks_invalid_state(self, mock_get_replica,
+                                                mock_check_replica_running,
+                                                mock_create_task):
         mock_replica = mock_get_replica.return_value
         mock_replica.instances = [mock.sentinel.instance]
         mock_replica.info = {}
+        delete_replica_disks = testutils.get_wrapped_function(
+            self.server.delete_replica_disks)
 
         self.assertRaises(exception.InvalidReplicaState,
-                          self.server.delete_replica_disks,
-                          mock.sentinel.context, mock.sentinel.replica_id)
+                          delete_replica_disks,
+                          self.server, mock.sentinel.context, mock.sentinel.replica_id)
 
         mock_get_replica.assert_called_once_with(mock.sentinel.context,
                                                  mock.sentinel.replica_id)

+ 0 - 0
coriolis/tests/db/__init__.py


+ 2 - 25
coriolis/tests/db/test_api.py

@@ -5,30 +5,7 @@ from unittest import mock
 
 from coriolis.db import api
 from coriolis import exception
-from coriolis.tests import test_base
-
-
-def get_wrapped_function(function):
-    """Get the method at the bottom of a stack of decorators."""
-    if not hasattr(function, '__closure__') or not function.__closure__:
-        return function
-
-    def _get_wrapped_function(function):
-        if not hasattr(function, '__closure__') or not function.__closure__:
-            return None
-
-        for closure in function.__closure__:
-            func = closure.cell_contents
-
-            deeper_func = _get_wrapped_function(func)
-            if deeper_func:
-                return deeper_func
-            elif hasattr(closure.cell_contents, '__call__'):
-                return closure.cell_contents
-
-        return function
-
-    return _get_wrapped_function(function)
+from coriolis.tests import test_base, testutils
 
 
 class DBAPITestCase(test_base.CoriolisBaseTestCase):
@@ -41,7 +18,7 @@ class DBAPITestCase(test_base.CoriolisBaseTestCase):
         # We only need to test the unwrapped functions. Without this,
         # when calling a coriolis.db.api function, it will try to
         # establish an SQL connection.
-        update_endpoint = get_wrapped_function(api.update_endpoint)
+        update_endpoint = testutils.get_wrapped_function(api.update_endpoint)
 
         self.assertRaises(exception.NotFound, update_endpoint,
                           mock.sentinel.context, mock.sentinel.endpoint_id,

+ 2 - 2
coriolis/tests/test_schemas.py

@@ -73,7 +73,7 @@ class SchemasTestCase(test_base.CoriolisBaseTestCase):
 
         schemas.validate_value(test_value, test_schema)
 
-        mock_validate.assert_called_once_with(test_value, test_schema)
+        mock_validate.assert_called_once_with(test_value, test_schema, format_checker=None)
 
     @mock.patch.object(json, 'loads')
     @mock.patch.object(jsonschema, 'validate')
@@ -87,4 +87,4 @@ class SchemasTestCase(test_base.CoriolisBaseTestCase):
         schemas.validate_string(test_string, test_schema)
 
         mock_loads.assert_called_once_with(test_string)
-        mock_validate.assert_called_once_with(test_value, test_schema)
+        mock_validate.assert_called_once_with(test_value, test_schema, format_checker=None)

+ 23 - 0
coriolis/tests/testutils.py

@@ -11,3 +11,26 @@ def identity_dec(item, *args, **kwargs):
 def make_identity_decorator_mock():
     """Returns a MagicMock with identity_dec as a side-effect."""
     return mock.MagicMock(side_effect=identity_dec)
+
+
+def get_wrapped_function(function):
+    """Get the method at the bottom of a stack of decorators."""
+    if not hasattr(function, '__closure__') or not function.__closure__:
+        return function
+
+    def _get_wrapped_function(function):
+        if not hasattr(function, '__closure__') or not function.__closure__:
+            return None
+
+        for closure in function.__closure__:
+            func = closure.cell_contents
+
+            deeper_func = _get_wrapped_function(func)
+            if deeper_func:
+                return deeper_func
+            elif hasattr(closure.cell_contents, '__call__'):
+                return closure.cell_contents
+
+        return function
+
+    return _get_wrapped_function(function)