Преглед изворни кода

Add --worker-process-count argument to relevant components.

This patch adds a new command line argument ('--worker-process-count')
to the components which need it (API, Conductor, and Worker) to allow
for CLI-based control on the number of worker processes spawned.

Note that this will NOT stop the components from forking if they are
programmed to do so, but merely stops oslo_service from forking the main
process at initial startup.
Nashwan Azhari пре 5 година
родитељ
комит
ae0834f0b5
5 измењених фајлова са 47 додато и 13 уклоњено
  1. 4 3
      coriolis/cmd/api.py
  2. 3 3
      coriolis/cmd/conductor.py
  3. 3 3
      coriolis/cmd/worker.py
  4. 34 3
      coriolis/service.py
  5. 3 1
      coriolis/worker/rpc/server.py

+ 4 - 3
coriolis/cmd/api.py

@@ -12,11 +12,12 @@ CONF = cfg.CONF
 
 
 def main():
-    CONF(sys.argv[1:], project='coriolis',
-         version="1.0.0")
+    worker_count, args = service.get_worker_count_from_args(sys.argv)
+    CONF(args[1:], project='coriolis', version="1.0.0")
     utils.setup_logging()
 
-    server = service.WSGIService('coriolis-api')
+    server = service.WSGIService(
+        'coriolis-api', worker_count=worker_count)
     launcher = service.service.launch(
         CONF, server, workers=server.get_workers_count())
     launcher.wait()

+ 3 - 3
coriolis/cmd/conductor.py

@@ -14,15 +14,15 @@ CONF = cfg.CONF
 
 
 def main():
-    CONF(sys.argv[1:], project='coriolis',
-         version="1.0.0")
+    worker_count, args = service.get_worker_count_from_args(sys.argv)
+    CONF(args[1:], project='coriolis', version="1.0.0")
     utils.setup_logging()
     service.check_locks_dir_empty()
 
     server = service.MessagingService(
         constants.CONDUCTOR_MAIN_MESSAGING_TOPIC,
         [rpc_server.ConductorServerEndpoint()],
-        rpc_server.VERSION)
+        rpc_server.VERSION, worker_count=worker_count)
     launcher = service.service.launch(
         CONF, server, workers=server.get_workers_count())
     launcher.wait()

+ 3 - 3
coriolis/cmd/worker.py

@@ -14,14 +14,14 @@ CONF = cfg.CONF
 
 
 def main():
-    CONF(sys.argv[1:], project='coriolis',
-         version="1.0.0")
+    worker_count, args = service.get_worker_count_from_args(sys.argv)
+    CONF(args[1:], project='coriolis', version="1.0.0")
     utils.setup_logging()
 
     server = service.MessagingService(
         constants.WORKER_MAIN_MESSAGING_TOPIC,
         [rpc_server.WorkerServerEndpoint()],
-        rpc_server.VERSION)
+        rpc_server.VERSION, worker_count=worker_count)
     launcher = service.service.launch(
         CONF, server, workers=server.get_workers_count())
     launcher.wait()

+ 34 - 3
coriolis/service.py

@@ -1,6 +1,7 @@
 # Copyright 2016 Cloudbase Solutions Srl
 # All Rights Reserved.
 
+import argparse
 import os
 import platform
 
@@ -35,6 +36,28 @@ CONF.register_opts(service_opts)
 LOG = logging.getLogger(__name__)
 
 
+def get_worker_count_from_args(argv):
+    """ Parses the args for '--worker-process-count' and returns a tuple
+    containing the count (defaults to logical CPU count if
+    --worker-process-count is not present), as well as the unprocessed args.
+    """
+    parser = argparse.ArgumentParser()
+    def _check_positive_worker_count(worker_count):
+        count = int(worker_count)
+        if count <= 0:
+            raise argparse.ArgumentTypeError(
+                "Worker process count must be a strictly positive integer, "
+                "got: %s" % worker_count)
+        return count
+    parser.add_argument(
+        '--worker-process-count', metavar='N', type=_check_positive_worker_count,
+        default=processutils.get_worker_count(),
+        help="Number of worker processes for this service. Defaults to the "
+             "number of logical CPU cores on the system.")
+    args, unknown_args = parser.parse_known_args(args=argv)
+    return args.worker_process_count, unknown_args
+
+
 def check_locks_dir_empty():
     """ Checks whether the locks dir is empty and warns otherwise.
 
@@ -76,12 +99,15 @@ def check_locks_dir_empty():
 
 
 class WSGIService(service.ServiceBase):
-    def __init__(self, name):
+    def __init__(self, name, worker_count=None):
         self._host = CONF.api_migration_listen
         self._port = CONF.api_migration_listen_port
 
+        # NOTE: oslo_service fork()'s, which won't work on Windows...
         if platform.system() == "Windows":
             self._workers = 1
+        elif worker_count is not None:
+            self._workers = int(worker_count)
         else:
             self._workers = (
                 CONF.api_migration_workers or processutils.get_worker_count())
@@ -118,8 +144,13 @@ class MessagingService(service.ServiceBase):
                                   version=version)
         self._server = rpc.get_server(target, endpoints)
 
-        self._workers = (worker_count or CONF.messaging_workers or
-                         processutils.get_worker_count())
+        # NOTE: oslo_service fork()'s, which won't work on Windows...
+        if platform.system() == "Windows":
+            self._workers = 1
+        elif worker_count is not None:
+            self._workers = int(worker_count)
+        else:
+            self._workers = processutils.get_worker_count()
 
     def get_workers_count(self):
         return self._workers

+ 3 - 1
coriolis/worker/rpc/server.py

@@ -23,6 +23,7 @@ from coriolis import events
 from coriolis import exception
 from coriolis.providers import factory as providers_factory
 from coriolis import schemas
+from coriolis import service
 from coriolis.tasks import factory as task_runners_factory
 from coriolis import utils
 
@@ -624,7 +625,8 @@ class WorkerServerEndpoint(object):
 
 def _setup_task_process(mp_log_q):
     # Setting up logging and cfg, needed since this is a new process
-    cfg.CONF(sys.argv[1:], project='coriolis', version="1.0.0")
+    _, args = service.get_worker_count_from_args(sys.argv)
+    cfg.CONF(args[1:], project='coriolis', version="1.0.0")
     utils.setup_logging()
 
     # Log events need to be handled in the parent process