From 4a8a452e85b198790978e1ada15a4713538db750 Mon Sep 17 00:00:00 2001 From: Guy Av <47632673+GuyAv46@users.noreply.github.com> Date: Thu, 23 Apr 2026 15:54:36 +0300 Subject: [PATCH 1/6] Fix parallel-coordinator hang when summary pipe saturates The coordinator drained the 'summary' queue only after joining all worker processes. With enough queued data (or a single large testsFailed dict), the summary-pipe buffer (~64 KiB on Linux) saturates and worker feeder threads block in pipe_write, both inside on_timeout's join_thread() and during Python's end-of-process queue finalization. This in turn hangs the coordinator's p.join() indefinitely. Introduce a module-level helper _join_workers_with_summary_drain that joins workers while continuously draining 'summary' from a background thread, and use it in execute(). Also correct the stale comment in the on_timeout closure to describe the actual watcher-thread os._exit(1) flow. --- RLTest/__main__.py | 57 ++++++++++++++++---- tests/unit/test_parallel_drain.py | 86 +++++++++++++++++++++++++++++++ 2 files changed, 133 insertions(+), 10 deletions(-) create mode 100644 tests/unit/test_parallel_drain.py diff --git a/RLTest/__main__.py b/RLTest/__main__.py index 3cfae28..13ab0a1 100644 --- a/RLTest/__main__.py +++ b/RLTest/__main__.py @@ -357,6 +357,44 @@ def __enter__(self): def __exit__(self, type, value, traceback): self.runner.takeEnvDown() +def _join_workers_with_summary_drain(processes, summary, timeout=None): + """Wait for all worker processes to exit while continuously draining the + ``summary`` queue, and return the list of collected summary entries. + + A background thread drains ``summary`` so worker feeder threads never + block writing to a full summary-pipe buffer. Without this, + ``on_timeout``'s ``summary.join_thread()`` and Python's end-of-process + queue finalization can both block in ``pipe_write``, in turn hanging + ``p.join()`` here indefinitely. + """ + stop = threading.Event() + collected = [] + + def _drain(): + while not stop.is_set(): + try: + collected.append(summary.get(timeout=0.1)) + except Exception: + pass + + drainer = threading.Thread(target=_drain) + drainer.start() + try: + deadline = None if timeout is None else time.time() + timeout + for p in processes: + remaining = None if deadline is None else max(0.0, deadline - time.time()) + p.join(timeout=remaining) + finally: + stop.set() + drainer.join() + while True: + try: + collected.append(summary.get_nowait()) + except Exception: + break + return collected + + class TestTimeLimit(object): """ A test timeout watcher. The watcher opens thread that sleep for the @@ -963,7 +1001,12 @@ def on_timeout(): finally: results.put({'test_name': test.name, "output": output.getvalue()}, block=False) summary.put({'done': done, 'failures': self.testsFailed}, block=False) - # After we return the processes will be killed, so we must make sure the queues are drained properly. + # The watcher thread calls os._exit(1) immediately after this + # closure returns, bypassing Python finalization. Close the + # queues and join their feeder threads here so pending put()s + # are flushed to the pipes first. (The coordinator drains the + # summary queue concurrently, which prevents join_thread() from + # blocking on a full pipe.) results.close() summary.close() summary.join_thread() @@ -1008,15 +1051,9 @@ def on_timeout(): output = res['output'] print('%s' % output, end="") - for p in processes: - p.join() - - # join results - while True: - try: - res = summary.get(timeout=1) - except Exception as e: - break + # Join worker processes while concurrently draining `summary`, + # so their feeder threads do not block on a full pipe buffer. + for res in _join_workers_with_summary_drain(processes, summary): done += res['done'] self.testsFailed.update(res['failures']) diff --git a/tests/unit/test_parallel_drain.py b/tests/unit/test_parallel_drain.py new file mode 100644 index 0000000..cce75a0 --- /dev/null +++ b/tests/unit/test_parallel_drain.py @@ -0,0 +1,86 @@ +"""Regression test for a hang in the parallel test coordinator. + +Prior to the fix, the coordinator joined all worker processes before draining +the ``summary`` queue. With enough data queued (or a single large +``self.testsFailed`` dict), the summary pipe buffer (~64 KiB on Linux) +saturates and worker feeder threads block in ``pipe_write`` during Python's +end-of-process queue finalization (and similarly inside ``on_timeout``'s +``summary.join_thread()``). That causes the coordinator's ``p.join()`` to +hang indefinitely. + +The fix drains ``summary`` from a background thread while workers are being +joined. This test reproduces the saturation scenario and asserts the helper +completes within a bounded time with every worker cleanly exited. +""" + +import multiprocessing as mp +import sys +import time +from unittest import TestCase + +from RLTest.__main__ import _join_workers_with_summary_drain + + +# ~32 KiB per message × 8 workers = 256 KiB total, comfortably exceeding the +# typical 64 KiB pipe buffer on Linux, so at least some feeder threads will +# block on ``pipe_write`` unless the parent is actively reading. +_PAYLOAD_BYTES = 32 * 1024 +_NUM_WORKERS = 8 +_JOIN_TIMEOUT_SECS = 30.0 + + +def _worker_puts_large_summary(summary): + summary.put({ + 'done': 1, + 'failures': {}, + 'payload': 'x' * _PAYLOAD_BYTES, + }) + # Return normally; Python finalization will join the feeder thread, + # which is where a non-draining parent would cause the hang. + + +class TestJoinWorkersWithSummaryDrain(TestCase): + + def setUp(self): + if sys.platform == 'win32': + self.skipTest('fork start method is unavailable on Windows') + self._ctx = mp.get_context('fork') + self._procs = [] + self._summary = None + + def tearDown(self): + # Safety net: if the helper ever hangs despite the fix, make sure the + # pytest session can still exit cleanly. + for p in self._procs: + if p.is_alive(): + p.kill() + p.join(timeout=5) + + def test_large_summary_does_not_hang(self): + self._summary = self._ctx.Queue() + self._procs = [ + self._ctx.Process( + target=_worker_puts_large_summary, + args=(self._summary,), + ) + for _ in range(_NUM_WORKERS) + ] + for p in self._procs: + p.start() + + start = time.time() + collected = _join_workers_with_summary_drain( + self._procs, self._summary, timeout=_JOIN_TIMEOUT_SECS, + ) + elapsed = time.time() - start + + for p in self._procs: + self.assertFalse( + p.is_alive(), + 'worker still alive after drain-join; summary pipe likely saturated', + ) + self.assertEqual(p.exitcode, 0) + self.assertEqual(len(collected), _NUM_WORKERS) + # The helper should return well under its own timeout; we only assert a + # loose upper bound to avoid flakiness on slow machines. + self.assertLess(elapsed, _JOIN_TIMEOUT_SECS) From c8df5bcb53506d40779fe1a92d68a18c7dcb8135 Mon Sep 17 00:00:00 2001 From: Guy Av <47632673+GuyAv46@users.noreply.github.com> Date: Thu, 23 Apr 2026 15:57:10 +0300 Subject: [PATCH 2/6] Use SimpleQueue for summary in parallel coordinator SimpleQueue.put is synchronous (no feeder thread, no internal buffer), so a successful put() implies the bytes are already in the kernel pipe. That removes the need for the summary.close() + summary.join_thread() dance in on_timeout before the watcher's os._exit(1), and the comment that explained it. The coordinator-side drain thread is updated to a blocking get() driven by a sentinel on shutdown, eliminating its busy-loop timeout too. results stays a Queue because the progressbar liveness loop relies on get(timeout=...), which SimpleQueue does not expose publicly. --- RLTest/__main__.py | 54 ++++++++++++++----------------- tests/unit/test_parallel_drain.py | 21 ++++++------ 2 files changed, 35 insertions(+), 40 deletions(-) diff --git a/RLTest/__main__.py b/RLTest/__main__.py index 13ab0a1..1b27ffa 100644 --- a/RLTest/__main__.py +++ b/RLTest/__main__.py @@ -12,7 +12,7 @@ import time import shlex import json -from multiprocessing import Process, Queue, set_start_method +from multiprocessing import Process, Queue, SimpleQueue, set_start_method from RLTest.env import Env, TestAssertionFailure, Defaults from RLTest.utils import Colors, fix_modules, fix_modulesArgs, is_github_actions @@ -357,25 +357,29 @@ def __enter__(self): def __exit__(self, type, value, traceback): self.runner.takeEnvDown() +# Sentinel used to signal the summary-drainer thread to stop. Must be +# pickleable (SimpleQueue pickles everything) and distinguishable from any +# payload a worker might put, which is always a dict. +_SUMMARY_DRAIN_STOP = ('__rltest_summary_stop__',) + + def _join_workers_with_summary_drain(processes, summary, timeout=None): """Wait for all worker processes to exit while continuously draining the ``summary`` queue, and return the list of collected summary entries. - A background thread drains ``summary`` so worker feeder threads never - block writing to a full summary-pipe buffer. Without this, - ``on_timeout``'s ``summary.join_thread()`` and Python's end-of-process - queue finalization can both block in ``pipe_write``, in turn hanging - ``p.join()`` here indefinitely. + A background thread drains ``summary`` so that worker ``put()`` calls + never block on a full summary-pipe buffer. Without this, on_timeout and + the final summary.put at worker-exit can block in ``pipe_write``, in + turn hanging ``p.join()`` here indefinitely. """ - stop = threading.Event() collected = [] def _drain(): - while not stop.is_set(): - try: - collected.append(summary.get(timeout=0.1)) - except Exception: - pass + while True: + item = summary.get() + if item == _SUMMARY_DRAIN_STOP: + break + collected.append(item) drainer = threading.Thread(target=_drain) drainer.start() @@ -385,13 +389,8 @@ def _drain(): remaining = None if deadline is None else max(0.0, deadline - time.time()) p.join(timeout=remaining) finally: - stop.set() + summary.put(_SUMMARY_DRAIN_STOP) drainer.join() - while True: - try: - collected.append(summary.get_nowait()) - except Exception: - break return collected @@ -999,17 +998,14 @@ def on_timeout(): except Exception as e: self.handleFailure(testFullName=test.name, testname=test.name, error_msg=Colors.Bred('Exception on timeout function %s' % str(e))) finally: + # `summary` is a SimpleQueue, so its put() writes straight to + # the pipe and needs no explicit flush. `results` is a Queue + # with a feeder thread; close() + join_thread() ensures the + # put above is written to the pipe before the watcher thread + # calls os._exit(1), which bypasses Python finalization. + summary.put({'done': done, 'failures': self.testsFailed}) results.put({'test_name': test.name, "output": output.getvalue()}, block=False) - summary.put({'done': done, 'failures': self.testsFailed}, block=False) - # The watcher thread calls os._exit(1) immediately after this - # closure returns, bypassing Python finalization. Close the - # queues and join their feeder threads here so pending put()s - # are flushed to the pipes first. (The coordinator drains the - # summary queue concurrently, which prevents join_thread() from - # blocking on a full pipe.) results.close() - summary.close() - summary.join_thread() results.join_thread() done += self.run_single_test(test, on_timeout) @@ -1018,10 +1014,10 @@ def on_timeout(): self.takeEnvDown(fullShutDown=True) # serialized the results back - summary.put({'done': done, 'failures': self.testsFailed}, block=False) + summary.put({'done': done, 'failures': self.testsFailed}) results = Queue() - summary = Queue() + summary = SimpleQueue() # Open group for all tests at the start (parallel execution) self._openGitHubActionsTestsGroup() if self.parallelism == 1: diff --git a/tests/unit/test_parallel_drain.py b/tests/unit/test_parallel_drain.py index cce75a0..b1f19cc 100644 --- a/tests/unit/test_parallel_drain.py +++ b/tests/unit/test_parallel_drain.py @@ -1,16 +1,14 @@ """Regression test for a hang in the parallel test coordinator. -Prior to the fix, the coordinator joined all worker processes before draining +Prior to the fix, the coordinator joined all worker processes before reading the ``summary`` queue. With enough data queued (or a single large ``self.testsFailed`` dict), the summary pipe buffer (~64 KiB on Linux) -saturates and worker feeder threads block in ``pipe_write`` during Python's -end-of-process queue finalization (and similarly inside ``on_timeout``'s -``summary.join_thread()``). That causes the coordinator's ``p.join()`` to -hang indefinitely. +saturates and worker ``put()``s block in ``pipe_write``, which in turn hangs +the coordinator's ``p.join()`` indefinitely. The fix drains ``summary`` from a background thread while workers are being joined. This test reproduces the saturation scenario and asserts the helper -completes within a bounded time with every worker cleanly exited. +completes with every worker cleanly exited. """ import multiprocessing as mp @@ -22,21 +20,22 @@ # ~32 KiB per message × 8 workers = 256 KiB total, comfortably exceeding the -# typical 64 KiB pipe buffer on Linux, so at least some feeder threads will -# block on ``pipe_write`` unless the parent is actively reading. +# typical 64 KiB pipe buffer on Linux, so at least some writers will block +# on ``pipe_write`` unless the parent is actively reading. _PAYLOAD_BYTES = 32 * 1024 _NUM_WORKERS = 8 _JOIN_TIMEOUT_SECS = 30.0 def _worker_puts_large_summary(summary): + # SimpleQueue.put writes synchronously to the pipe: without a parallel + # drain on the parent side, this call itself blocks forever once the + # pipe saturates. summary.put({ 'done': 1, 'failures': {}, 'payload': 'x' * _PAYLOAD_BYTES, }) - # Return normally; Python finalization will join the feeder thread, - # which is where a non-draining parent would cause the hang. class TestJoinWorkersWithSummaryDrain(TestCase): @@ -57,7 +56,7 @@ def tearDown(self): p.join(timeout=5) def test_large_summary_does_not_hang(self): - self._summary = self._ctx.Queue() + self._summary = self._ctx.SimpleQueue() self._procs = [ self._ctx.Process( target=_worker_puts_large_summary, From c0cb696cb7cb5a714abb78a16135247965595a76 Mon Sep 17 00:00:00 2001 From: Guy Av <47632673+GuyAv46@users.noreply.github.com> Date: Sun, 26 Apr 2026 09:25:23 +0300 Subject: [PATCH 3/6] Address review: reorder puts in on_timeout, fix stale comment In the on_timeout closure, put 'results' before 'summary'. The summary queue is a SimpleQueue with a synchronous put(), and the coordinator only starts draining it after every result is in. Putting summary first risked blocking on a full summary pipe while the coordinator was still waiting on this worker's result, which would have stalled the whole results-collection loop. Putting results first guarantees the worker's output reaches the coordinator unconditionally; the subsequent summary put may briefly block but always unblocks once the coordinator moves to the drain phase. Also drop the stale 'feeder threads' wording near the call site: the summary queue no longer has a feeder thread. --- RLTest/__main__.py | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/RLTest/__main__.py b/RLTest/__main__.py index 1b27ffa..d23772d 100644 --- a/RLTest/__main__.py +++ b/RLTest/__main__.py @@ -998,15 +998,20 @@ def on_timeout(): except Exception as e: self.handleFailure(testFullName=test.name, testname=test.name, error_msg=Colors.Bred('Exception on timeout function %s' % str(e))) finally: - # `summary` is a SimpleQueue, so its put() writes straight to - # the pipe and needs no explicit flush. `results` is a Queue - # with a feeder thread; close() + join_thread() ensures the - # put above is written to the pipe before the watcher thread - # calls os._exit(1), which bypasses Python finalization. - summary.put({'done': done, 'failures': self.testsFailed}) + # Order matters: `results` first so the coordinator (which is + # actively draining `results`) gets this worker's output and + # can progress toward the summary-drain phase. `summary` is a + # SimpleQueue with a synchronous put(); the coordinator only + # starts draining it after every result is in, so this put may + # briefly block here on a full pipe and that's fine. + # `results` is a Queue with a feeder thread; close() + + # join_thread() flushes the put above to the pipe before the + # watcher thread calls os._exit(1), which bypasses Python + # finalization. results.put({'test_name': test.name, "output": output.getvalue()}, block=False) results.close() results.join_thread() + summary.put({'done': done, 'failures': self.testsFailed}) done += self.run_single_test(test, on_timeout) results.put({'test_name': test.name, "output": output.getvalue()}, block=False) @@ -1047,8 +1052,8 @@ def on_timeout(): output = res['output'] print('%s' % output, end="") - # Join worker processes while concurrently draining `summary`, - # so their feeder threads do not block on a full pipe buffer. + # Join worker processes while concurrently draining `summary`, so + # workers do not block on its pipe buffer filling up. for res in _join_workers_with_summary_drain(processes, summary): done += res['done'] self.testsFailed.update(res['failures']) From dfc911468e058eef58a4d86ce7c742bb26aceb4d Mon Sep 17 00:00:00 2001 From: Guy Av <47632673+GuyAv46@users.noreply.github.com> Date: Sun, 26 Apr 2026 10:33:16 +0300 Subject: [PATCH 4/6] Collapse parallel results+summary into a single queue The parallel coordinator used two queues: 'results' for per-test output (read by the progressbar loop) and 'summary' for per-worker aggregates ('done' count and the worker's full testsFailed dict, read after the loop). Workers' summary.put could block on a full pipe because the coordinator only drained summary in a second phase, after every results message had been received. Collapse to a single 'results' queue carrying one self-contained message per test: { test_name, output, done, failures }. The worker resets self.testsFailed = {} before each test so addFailure() writes into a fresh dict that ships verbatim; the worker keeps no cumulative state. The coordinator owns the canonical testsFailed via update() per message. This eliminates the deadlock by construction: the only queue is drained continuously by the coordinator's progressbar loop for the entire lifetime of the workers, so worker put()s can never block on a full pipe. Removes the SimpleQueue import, the _SUMMARY_DRAIN_STOP sentinel, and the _join_workers_with_summary_drain helper. on_timeout shrinks to a single put + close + join_thread. The unit test is updated to exercise the new pattern: workers push many large per-test messages while the main thread drains them live. --- RLTest/__main__.py | 90 ++++++++----------------------- tests/unit/test_parallel_drain.py | 71 ++++++++++++------------ 2 files changed, 59 insertions(+), 102 deletions(-) diff --git a/RLTest/__main__.py b/RLTest/__main__.py index d23772d..7bb45a3 100644 --- a/RLTest/__main__.py +++ b/RLTest/__main__.py @@ -12,7 +12,7 @@ import time import shlex import json -from multiprocessing import Process, Queue, SimpleQueue, set_start_method +from multiprocessing import Process, Queue, set_start_method from RLTest.env import Env, TestAssertionFailure, Defaults from RLTest.utils import Colors, fix_modules, fix_modulesArgs, is_github_actions @@ -357,43 +357,6 @@ def __enter__(self): def __exit__(self, type, value, traceback): self.runner.takeEnvDown() -# Sentinel used to signal the summary-drainer thread to stop. Must be -# pickleable (SimpleQueue pickles everything) and distinguishable from any -# payload a worker might put, which is always a dict. -_SUMMARY_DRAIN_STOP = ('__rltest_summary_stop__',) - - -def _join_workers_with_summary_drain(processes, summary, timeout=None): - """Wait for all worker processes to exit while continuously draining the - ``summary`` queue, and return the list of collected summary entries. - - A background thread drains ``summary`` so that worker ``put()`` calls - never block on a full summary-pipe buffer. Without this, on_timeout and - the final summary.put at worker-exit can block in ``pipe_write``, in - turn hanging ``p.join()`` here indefinitely. - """ - collected = [] - - def _drain(): - while True: - item = summary.get() - if item == _SUMMARY_DRAIN_STOP: - break - collected.append(item) - - drainer = threading.Thread(target=_drain) - drainer.start() - try: - deadline = None if timeout is None else time.time() + timeout - for p in processes: - remaining = None if deadline is None else max(0.0, deadline - time.time()) - p.join(timeout=remaining) - finally: - summary.put(_SUMMARY_DRAIN_STOP) - drainer.join() - return collected - - class TestTimeLimit(object): """ A test timeout watcher. The watcher opens thread that sleep for the @@ -978,51 +941,43 @@ def on_timeout(): self.takeEnvDown(fullShutDown=True) - def run_jobs(jobs, results, summary, port): + def run_jobs(jobs, results, port): Defaults.port = port - done = 0 while True: try: test = jobs.get(timeout=0.1) except Exception as e: break + # Reset per-test: addFailure() in this worker writes into this + # dict, which is shipped as-is. The coordinator owns the + # cumulative testsFailed; the worker keeps no state across tests. + self.testsFailed = {} output = io.StringIO() with redirect_stdout(output): def on_timeout(): - nonlocal done try: - done += 1 self.killEnvWithSegFault() self.handleFailure(testFullName=test.name, testname=test.name, error_msg=Colors.Bred('Test timeout')) except Exception as e: self.handleFailure(testFullName=test.name, testname=test.name, error_msg=Colors.Bred('Exception on timeout function %s' % str(e))) finally: - # Order matters: `results` first so the coordinator (which is - # actively draining `results`) gets this worker's output and - # can progress toward the summary-drain phase. `summary` is a - # SimpleQueue with a synchronous put(); the coordinator only - # starts draining it after every result is in, so this put may - # briefly block here on a full pipe and that's fine. - # `results` is a Queue with a feeder thread; close() + - # join_thread() flushes the put above to the pipe before the - # watcher thread calls os._exit(1), which bypasses Python - # finalization. - results.put({'test_name': test.name, "output": output.getvalue()}, block=False) + # The watcher thread calls os._exit(1) right after this + # returns, bypassing Python finalization. close() + + # join_thread() flushes the put to the pipe first. + results.put({'test_name': test.name, 'output': output.getvalue(), + 'done': 1, 'failures': self.testsFailed}, block=False) results.close() results.join_thread() - summary.put({'done': done, 'failures': self.testsFailed}) - done += self.run_single_test(test, on_timeout) - results.put({'test_name': test.name, "output": output.getvalue()}, block=False) + done_delta = self.run_single_test(test, on_timeout) - self.takeEnvDown(fullShutDown=True) + results.put({'test_name': test.name, 'output': output.getvalue(), + 'done': done_delta, 'failures': self.testsFailed}, block=False) - # serialized the results back - summary.put({'done': done, 'failures': self.testsFailed}) + self.takeEnvDown(fullShutDown=True) results = Queue() - summary = SimpleQueue() # Open group for all tests at the start (parallel execution) self._openGitHubActionsTestsGroup() if self.parallelism == 1: @@ -1031,7 +986,7 @@ def on_timeout(): processes = [] currPort = Defaults.port for i in range(self.parallelism): - p = Process(target=run_jobs, args=(jobs,results,summary,currPort)) + p = Process(target=run_jobs, args=(jobs,results,currPort)) currPort += 30 # safe distance for cluster and replicas processes.append(p) p.start() @@ -1049,15 +1004,16 @@ def on_timeout(): except Exception as e: if not has_live_processor: raise Exception('Failed to get job result and no more processors is alive') - output = res['output'] - print('%s' % output, end="") - - # Join worker processes while concurrently draining `summary`, so - # workers do not block on its pipe buffer filling up. - for res in _join_workers_with_summary_drain(processes, summary): + print('%s' % res['output'], end="") done += res['done'] self.testsFailed.update(res['failures']) + # All per-test messages are accounted for; workers exit on their own + # once `jobs` drains. The single results queue is drained continuously + # above, so workers can never block on a full pipe. + for p in processes: + p.join() + endTime = time.time() # Close group after all tests complete (parallel execution) diff --git a/tests/unit/test_parallel_drain.py b/tests/unit/test_parallel_drain.py index b1f19cc..888a460 100644 --- a/tests/unit/test_parallel_drain.py +++ b/tests/unit/test_parallel_drain.py @@ -1,14 +1,15 @@ """Regression test for a hang in the parallel test coordinator. -Prior to the fix, the coordinator joined all worker processes before reading -the ``summary`` queue. With enough data queued (or a single large -``self.testsFailed`` dict), the summary pipe buffer (~64 KiB on Linux) -saturates and worker ``put()``s block in ``pipe_write``, which in turn hangs -the coordinator's ``p.join()`` indefinitely. +The parallel coordinator uses a single ``results`` queue carrying one message +per test. Workers push these messages while the coordinator drains them in its +progressbar loop. If the coordinator ever stops draining before workers stop +pushing, large per-test outputs saturate the pipe (~64 KiB on Linux), worker +``put()`` calls block in ``pipe_write``, and ``p.join()`` hangs indefinitely. -The fix drains ``summary`` from a background thread while workers are being -joined. This test reproduces the saturation scenario and asserts the helper -completes with every worker cleanly exited. +This test reproduces the saturation scenario by spawning workers that each +push many large messages, then asserts that a coordinator that drains +continuously throughout the workers' lifetime finishes promptly with every +message accounted for and every worker cleanly exited. """ import multiprocessing as mp @@ -16,70 +17,70 @@ import time from unittest import TestCase -from RLTest.__main__ import _join_workers_with_summary_drain - -# ~32 KiB per message × 8 workers = 256 KiB total, comfortably exceeding the -# typical 64 KiB pipe buffer on Linux, so at least some writers will block -# on ``pipe_write`` unless the parent is actively reading. +# ~32 KiB per message × 8 workers × 8 messages = 2 MiB total, well over the +# typical 64 KiB pipe buffer on Linux, so writers will block on ``pipe_write`` +# unless the parent is actively reading throughout. _PAYLOAD_BYTES = 32 * 1024 _NUM_WORKERS = 8 +_MSGS_PER_WORKER = 8 _JOIN_TIMEOUT_SECS = 30.0 -def _worker_puts_large_summary(summary): - # SimpleQueue.put writes synchronously to the pipe: without a parallel - # drain on the parent side, this call itself blocks forever once the - # pipe saturates. - summary.put({ - 'done': 1, - 'failures': {}, - 'payload': 'x' * _PAYLOAD_BYTES, - }) +def _worker_puts_many_results(results, n_msgs, payload_bytes): + # Queue.put is async (via a feeder thread), but at process exit the feeder + # must flush the buffered items to the pipe before the worker can exit. If + # the parent is not draining, that flush blocks forever. + payload = 'x' * payload_bytes + for i in range(n_msgs): + results.put({'test_name': 't%d' % i, 'output': payload, + 'done': 1, 'failures': {}}) -class TestJoinWorkersWithSummaryDrain(TestCase): +class TestParallelResultsDrain(TestCase): def setUp(self): if sys.platform == 'win32': self.skipTest('fork start method is unavailable on Windows') self._ctx = mp.get_context('fork') self._procs = [] - self._summary = None def tearDown(self): - # Safety net: if the helper ever hangs despite the fix, make sure the + # Safety net: if the test ever hangs despite the fix, make sure the # pytest session can still exit cleanly. for p in self._procs: if p.is_alive(): p.kill() p.join(timeout=5) - def test_large_summary_does_not_hang(self): - self._summary = self._ctx.SimpleQueue() + def test_continuous_drain_does_not_hang(self): + results = self._ctx.Queue() self._procs = [ self._ctx.Process( - target=_worker_puts_large_summary, - args=(self._summary,), + target=_worker_puts_many_results, + args=(results, _MSGS_PER_WORKER, _PAYLOAD_BYTES), ) for _ in range(_NUM_WORKERS) ] for p in self._procs: p.start() + # Mimic the coordinator's progressbar loop: drain every per-test + # message live, in the same thread, while workers are still running. + expected = _NUM_WORKERS * _MSGS_PER_WORKER start = time.time() - collected = _join_workers_with_summary_drain( - self._procs, self._summary, timeout=_JOIN_TIMEOUT_SECS, - ) + collected = [results.get(timeout=_JOIN_TIMEOUT_SECS) for _ in range(expected)] + for p in self._procs: + p.join(timeout=_JOIN_TIMEOUT_SECS) elapsed = time.time() - start for p in self._procs: self.assertFalse( p.is_alive(), - 'worker still alive after drain-join; summary pipe likely saturated', + 'worker still alive after join; results pipe likely saturated', ) self.assertEqual(p.exitcode, 0) - self.assertEqual(len(collected), _NUM_WORKERS) - # The helper should return well under its own timeout; we only assert a + self.assertEqual(len(collected), expected) + # The drain should return well under its own timeout; we only assert a # loose upper bound to avoid flakiness on slow machines. self.assertLess(elapsed, _JOIN_TIMEOUT_SECS) From 3a50d5a35fee44eb432490a5de7f88e032765162 Mon Sep 17 00:00:00 2001 From: GuyAv46 Date: Sun, 26 Apr 2026 11:32:06 +0300 Subject: [PATCH 5/6] Enhance parallel test execution: add shutdown messages and improve result handling --- RLTest/__main__.py | 49 ++++++++++++++++++++++++++++++---------------- 1 file changed, 32 insertions(+), 17 deletions(-) diff --git a/RLTest/__main__.py b/RLTest/__main__.py index 7bb45a3..44b88d8 100644 --- a/RLTest/__main__.py +++ b/RLTest/__main__.py @@ -973,9 +973,18 @@ def on_timeout(): done_delta = self.run_single_test(test, on_timeout) results.put({'test_name': test.name, 'output': output.getvalue(), - 'done': done_delta, 'failures': self.testsFailed}, block=False) - + 'done': done_delta, 'failures': self.testsFailed, + 'shutdown': False}, block=False) + + # Always ship one shutdown message per worker so the coordinator + # reads a known total of n_jobs + parallelism messages. Captures + # failures raised during final shutdown (e.g. "redis did not exit + # cleanly" when env_reuse=True). + self.testsFailed = {} self.takeEnvDown(fullShutDown=True) + results.put({'test_name': '', 'output': '', + 'done': 0, 'failures': self.testsFailed, + 'shutdown': True}, block=False) results = Queue() # Open group for all tests at the start (parallel execution) @@ -990,27 +999,33 @@ def on_timeout(): currPort += 30 # safe distance for cluster and replicas processes.append(p) p.start() - for _ in self.progressbar(n_jobs): + # Workers send exactly n_jobs per-test messages plus one shutdown + # message each, for a known total. The single shared queue does + # not preserve per-worker ordering, so a fast worker's shutdown + # may arrive before a slow worker's last test. We read every + # message in one bounded loop and tick the progressbar only on + # per-test ones. The has_live_processor guard turns a worker + # crash before it ships its shutdown message into a clean error + # instead of an indefinite hang. + def _get_result(): while True: - # check if we have some lives executors - has_live_processor = False - for p in processes: - if p.is_alive(): - has_live_processor = True - break try: - res = results.get(timeout=1) - break - except Exception as e: - if not has_live_processor: + return results.get(timeout=1) + except Exception: + if not any(p.is_alive() for p in processes): raise Exception('Failed to get job result and no more processors is alive') - print('%s' % res['output'], end="") + + bar_iter = iter(self.progressbar(n_jobs)) + for _ in range(n_jobs + self.parallelism): + res = _get_result() + if res['output']: + print('%s' % res['output'], end="") done += res['done'] self.testsFailed.update(res['failures']) + if not res['shutdown']: + next(bar_iter, None) + next(bar_iter, None) # finalize bar.update(n_jobs) - # All per-test messages are accounted for; workers exit on their own - # once `jobs` drains. The single results queue is drained continuously - # above, so workers can never block on a full pipe. for p in processes: p.join() From fe8ac761af4fda4dcb67ce76641bc20d98e1bc4e Mon Sep 17 00:00:00 2001 From: GuyAv46 Date: Sun, 26 Apr 2026 12:00:15 +0300 Subject: [PATCH 6/6] Address review: ship shutdown sentinel from on_timeout; misc fixups - on_timeout now ships both the per-test result (with shutdown=False) and the shutdown sentinel before the watcher thread calls os._exit(1), keeping the coordinator's bounded count of n_jobs + parallelism accurate when a worker dies on timeout. Also fixes a KeyError on the missing 'shutdown' key in the timeout payload. - Grammar: 'no more processors is alive' -> 'are alive'. - test_parallel_drain: use time.monotonic() for elapsed measurement. --- RLTest/__main__.py | 18 +++++++++++++----- tests/unit/test_parallel_drain.py | 4 ++-- 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/RLTest/__main__.py b/RLTest/__main__.py index 44b88d8..6414123 100644 --- a/RLTest/__main__.py +++ b/RLTest/__main__.py @@ -962,11 +962,19 @@ def on_timeout(): except Exception as e: self.handleFailure(testFullName=test.name, testname=test.name, error_msg=Colors.Bred('Exception on timeout function %s' % str(e))) finally: - # The watcher thread calls os._exit(1) right after this - # returns, bypassing Python finalization. close() + - # join_thread() flushes the put to the pipe first. + # The watcher thread calls os._exit(1) right after + # this returns, bypassing Python finalization and + # the normal post-loop shutdown put. Ship both the + # per-test result and the shutdown sentinel here so + # the coordinator's bounded count of + # n_jobs + parallelism remains accurate. close() + + # join_thread() flushes both puts to the pipe first. results.put({'test_name': test.name, 'output': output.getvalue(), - 'done': 1, 'failures': self.testsFailed}, block=False) + 'done': 1, 'failures': self.testsFailed, + 'shutdown': False}, block=False) + results.put({'test_name': '', 'output': '', + 'done': 0, 'failures': {}, + 'shutdown': True}, block=False) results.close() results.join_thread() @@ -1013,7 +1021,7 @@ def _get_result(): return results.get(timeout=1) except Exception: if not any(p.is_alive() for p in processes): - raise Exception('Failed to get job result and no more processors is alive') + raise Exception('Failed to get job result and no more processors are alive') bar_iter = iter(self.progressbar(n_jobs)) for _ in range(n_jobs + self.parallelism): diff --git a/tests/unit/test_parallel_drain.py b/tests/unit/test_parallel_drain.py index 888a460..1270c3b 100644 --- a/tests/unit/test_parallel_drain.py +++ b/tests/unit/test_parallel_drain.py @@ -68,11 +68,11 @@ def test_continuous_drain_does_not_hang(self): # Mimic the coordinator's progressbar loop: drain every per-test # message live, in the same thread, while workers are still running. expected = _NUM_WORKERS * _MSGS_PER_WORKER - start = time.time() + start = time.monotonic() collected = [results.get(timeout=_JOIN_TIMEOUT_SECS) for _ in range(expected)] for p in self._procs: p.join(timeout=_JOIN_TIMEOUT_SECS) - elapsed = time.time() - start + elapsed = time.monotonic() - start for p in self._procs: self.assertFalse(