Просмотр исходного кода

Fix scheduled transfers not triggering on cron schedule

The transfer-cron service started its cron loop in the
TransferCronServerEndpoint constructor, which runs in the parent process
before oslo_service forks its worker. Since oslo_service forks a worker
even with workers=1, the cron loop ran in the parent while the RPC
register/unregister handlers ran in the forked child. Each process kept
its own in-memory job registry, so schedules created or updated at
runtime were added to the child's registry while the parent's loop kept
checking an empty one, and scheduled transfers never started.

This was caused by the migration from eventlet to threading
which introduced the parent/child process split for this.
Fabian Fulga 1 неделя назад
Родитель
Сommit
7fbadcc6c1
2 измененных файлов с 42 добавлено и 2 удалено
  1. 23 1
      coriolis/tests/transfer_cron/rpc/test_server.py
  2. 19 1
      coriolis/transfer_cron/rpc/server.py

+ 23 - 1
coriolis/tests/transfer_cron/rpc/test_server.py

@@ -56,9 +56,31 @@ class TransferCronServerEndpointTestCase(test_base.CoriolisBaseTestCase):
     """Test suite for the Coriolis TransferCronServerEndpoint class."""
     """Test suite for the Coriolis TransferCronServerEndpoint class."""
 
 
     @mock.patch.object(server.TransferCronServerEndpoint, '_init_cron')
     @mock.patch.object(server.TransferCronServerEndpoint, '_init_cron')
-    def setUp(self, _):
+    @mock.patch.object(server.os, 'register_at_fork')
+    def setUp(self, mock_register_at_fork, _):
         super(TransferCronServerEndpointTestCase, self).setUp()
         super(TransferCronServerEndpointTestCase, self).setUp()
         self.server = server.TransferCronServerEndpoint()
         self.server = server.TransferCronServerEndpoint()
+        # Pretend cron is already running so register/unregister don't
+        # trigger a lazy _init_cron() during these isolated tests.
+        self.server._cron_started = True
+
+    @mock.patch.object(server.os, 'register_at_fork')
+    @mock.patch.object(server.TransferCronServerEndpoint, '_init_cron')
+    def test_init_defers_cron_to_after_fork(
+            self, mock_init_cron, mock_register_at_fork):
+        srv = server.TransferCronServerEndpoint()
+
+        mock_init_cron.assert_not_called()
+        mock_register_at_fork.assert_called_once_with(
+            after_in_child=srv._ensure_cron_started)
+
+    @mock.patch.object(server.TransferCronServerEndpoint, '_init_cron')
+    def test_ensure_cron_started_is_idempotent(self, mock_init_cron):
+        self.server._cron_started = False
+        self.server._ensure_cron_started()
+        self.server._ensure_cron_started()
+
+        mock_init_cron.assert_called_once()
 
 
     @ddt.data(
     @ddt.data(
         {
         {

+ 19 - 1
coriolis/transfer_cron/rpc/server.py

@@ -2,6 +2,8 @@
 # All Rights Reserved.
 # All Rights Reserved.
 
 
 import json
 import json
+import os
+import threading
 
 
 from oslo_log import log as logging
 from oslo_log import log as logging
 from oslo_utils import timeutils
 from oslo_utils import timeutils
@@ -38,7 +40,21 @@ class TransferCronServerEndpoint(object):
         # Setup cron loop
         # Setup cron loop
         self._cron = cron.Cron()
         self._cron = cron.Cron()
         self._admin_ctx = context.get_admin_context()
         self._admin_ctx = context.get_admin_context()
-        self._init_cron()
+        self._cron_lock = threading.Lock()
+        self._cron_started = False
+        # NOTE (fabi200123): oslo_service forks worker processes even when
+        # workers=1. The cron loop must run in the same process as the RPC
+        # handlers that register/unregister jobs, otherwise the loop checks
+        # a job registry in the parent process while registrations land in
+        # the forked child. Defer cron startup until after the fork.
+        os.register_at_fork(after_in_child=self._ensure_cron_started)
+
+    def _ensure_cron_started(self):
+        with self._cron_lock:
+            if self._cron_started:
+                return
+            self._init_cron()
+            self._cron_started = True
 
 
     def _deserialize_schedule(self, sched):
     def _deserialize_schedule(self, sched):
         expires = sched.get("expiration_date")
         expires = sched.get("expiration_date")
@@ -88,12 +104,14 @@ class TransferCronServerEndpoint(object):
         return schedules
         return schedules
 
 
     def register(self, ctxt, schedule):
     def register(self, ctxt, schedule):
+        self._ensure_cron_started()
         now = timeutils.utcnow()
         now = timeutils.utcnow()
         LOG.debug("Registering new schedule %s: %r" % (
         LOG.debug("Registering new schedule %s: %r" % (
             schedule["id"], schedule["schedule"]))
             schedule["id"], schedule["schedule"]))
         self._register_schedule(schedule, date=now)
         self._register_schedule(schedule, date=now)
 
 
     def unregister(self, ctxt, schedule):
     def unregister(self, ctxt, schedule):
+        self._ensure_cron_started()
         schedule_id = schedule["id"]
         schedule_id = schedule["id"]
         LOG.debug("removing schedule %s" % schedule_id)
         LOG.debug("removing schedule %s" % schedule_id)
         self._cron.unregister(schedule_id)
         self._cron.unregister(schedule_id)