diff --git a/RLTest/__main__.py b/RLTest/__main__.py index 3cfae28..6414123 100644 --- a/RLTest/__main__.py +++ b/RLTest/__main__.py @@ -941,44 +941,60 @@ def on_timeout(): self.takeEnvDown(fullShutDown=True) - def run_jobs(jobs, results, summary, port): + def run_jobs(jobs, results, port): Defaults.port = port - done = 0 while True: try: test = jobs.get(timeout=0.1) except Exception as e: break + # Reset per-test: addFailure() in this worker writes into this + # dict, which is shipped as-is. The coordinator owns the + # cumulative testsFailed; the worker keeps no state across tests. + self.testsFailed = {} output = io.StringIO() with redirect_stdout(output): def on_timeout(): - nonlocal done try: - done += 1 self.killEnvWithSegFault() self.handleFailure(testFullName=test.name, testname=test.name, error_msg=Colors.Bred('Test timeout')) except Exception as e: self.handleFailure(testFullName=test.name, testname=test.name, error_msg=Colors.Bred('Exception on timeout function %s' % str(e))) finally: - results.put({'test_name': test.name, "output": output.getvalue()}, block=False) - summary.put({'done': done, 'failures': self.testsFailed}, block=False) - # After we return the processes will be killed, so we must make sure the queues are drained properly. + # The watcher thread calls os._exit(1) right after + # this returns, bypassing Python finalization and + # the normal post-loop shutdown put. Ship both the + # per-test result and the shutdown sentinel here so + # the coordinator's bounded count of + # n_jobs + parallelism remains accurate. close() + + # join_thread() flushes both puts to the pipe first. + results.put({'test_name': test.name, 'output': output.getvalue(), + 'done': 1, 'failures': self.testsFailed, + 'shutdown': False}, block=False) + results.put({'test_name': '', 'output': '', + 'done': 0, 'failures': {}, + 'shutdown': True}, block=False) results.close() - summary.close() - summary.join_thread() results.join_thread() - done += self.run_single_test(test, on_timeout) - results.put({'test_name': test.name, "output": output.getvalue()}, block=False) + done_delta = self.run_single_test(test, on_timeout) - self.takeEnvDown(fullShutDown=True) + results.put({'test_name': test.name, 'output': output.getvalue(), + 'done': done_delta, 'failures': self.testsFailed, + 'shutdown': False}, block=False) - # serialized the results back - summary.put({'done': done, 'failures': self.testsFailed}, block=False) + # Always ship one shutdown message per worker so the coordinator + # reads a known total of n_jobs + parallelism messages. Captures + # failures raised during final shutdown (e.g. "redis did not exit + # cleanly" when env_reuse=True). + self.testsFailed = {} + self.takeEnvDown(fullShutDown=True) + results.put({'test_name': '', 'output': '', + 'done': 0, 'failures': self.testsFailed, + 'shutdown': True}, block=False) results = Queue() - summary = Queue() # Open group for all tests at the start (parallel execution) self._openGitHubActionsTestsGroup() if self.parallelism == 1: @@ -987,39 +1003,40 @@ def on_timeout(): processes = [] currPort = Defaults.port for i in range(self.parallelism): - p = Process(target=run_jobs, args=(jobs,results,summary,currPort)) + p = Process(target=run_jobs, args=(jobs,results,currPort)) currPort += 30 # safe distance for cluster and replicas processes.append(p) p.start() - for _ in self.progressbar(n_jobs): + # Workers send exactly n_jobs per-test messages plus one shutdown + # message each, for a known total. The single shared queue does + # not preserve per-worker ordering, so a fast worker's shutdown + # may arrive before a slow worker's last test. We read every + # message in one bounded loop and tick the progressbar only on + # per-test ones. The has_live_processor guard turns a worker + # crash before it ships its shutdown message into a clean error + # instead of an indefinite hang. + def _get_result(): while True: - # check if we have some lives executors - has_live_processor = False - for p in processes: - if p.is_alive(): - has_live_processor = True - break try: - res = results.get(timeout=1) - break - except Exception as e: - if not has_live_processor: - raise Exception('Failed to get job result and no more processors is alive') - output = res['output'] - print('%s' % output, end="") + return results.get(timeout=1) + except Exception: + if not any(p.is_alive() for p in processes): + raise Exception('Failed to get job result and no more processors are alive') + + bar_iter = iter(self.progressbar(n_jobs)) + for _ in range(n_jobs + self.parallelism): + res = _get_result() + if res['output']: + print('%s' % res['output'], end="") + done += res['done'] + self.testsFailed.update(res['failures']) + if not res['shutdown']: + next(bar_iter, None) + next(bar_iter, None) # finalize bar.update(n_jobs) for p in processes: p.join() - # join results - while True: - try: - res = summary.get(timeout=1) - except Exception as e: - break - done += res['done'] - self.testsFailed.update(res['failures']) - endTime = time.time() # Close group after all tests complete (parallel execution) diff --git a/tests/unit/test_parallel_drain.py b/tests/unit/test_parallel_drain.py new file mode 100644 index 0000000..1270c3b --- /dev/null +++ b/tests/unit/test_parallel_drain.py @@ -0,0 +1,86 @@ +"""Regression test for a hang in the parallel test coordinator. + +The parallel coordinator uses a single ``results`` queue carrying one message +per test. Workers push these messages while the coordinator drains them in its +progressbar loop. If the coordinator ever stops draining before workers stop +pushing, large per-test outputs saturate the pipe (~64 KiB on Linux), worker +``put()`` calls block in ``pipe_write``, and ``p.join()`` hangs indefinitely. + +This test reproduces the saturation scenario by spawning workers that each +push many large messages, then asserts that a coordinator that drains +continuously throughout the workers' lifetime finishes promptly with every +message accounted for and every worker cleanly exited. +""" + +import multiprocessing as mp +import sys +import time +from unittest import TestCase + + +# ~32 KiB per message × 8 workers × 8 messages = 2 MiB total, well over the +# typical 64 KiB pipe buffer on Linux, so writers will block on ``pipe_write`` +# unless the parent is actively reading throughout. +_PAYLOAD_BYTES = 32 * 1024 +_NUM_WORKERS = 8 +_MSGS_PER_WORKER = 8 +_JOIN_TIMEOUT_SECS = 30.0 + + +def _worker_puts_many_results(results, n_msgs, payload_bytes): + # Queue.put is async (via a feeder thread), but at process exit the feeder + # must flush the buffered items to the pipe before the worker can exit. If + # the parent is not draining, that flush blocks forever. + payload = 'x' * payload_bytes + for i in range(n_msgs): + results.put({'test_name': 't%d' % i, 'output': payload, + 'done': 1, 'failures': {}}) + + +class TestParallelResultsDrain(TestCase): + + def setUp(self): + if sys.platform == 'win32': + self.skipTest('fork start method is unavailable on Windows') + self._ctx = mp.get_context('fork') + self._procs = [] + + def tearDown(self): + # Safety net: if the test ever hangs despite the fix, make sure the + # pytest session can still exit cleanly. + for p in self._procs: + if p.is_alive(): + p.kill() + p.join(timeout=5) + + def test_continuous_drain_does_not_hang(self): + results = self._ctx.Queue() + self._procs = [ + self._ctx.Process( + target=_worker_puts_many_results, + args=(results, _MSGS_PER_WORKER, _PAYLOAD_BYTES), + ) + for _ in range(_NUM_WORKERS) + ] + for p in self._procs: + p.start() + + # Mimic the coordinator's progressbar loop: drain every per-test + # message live, in the same thread, while workers are still running. + expected = _NUM_WORKERS * _MSGS_PER_WORKER + start = time.monotonic() + collected = [results.get(timeout=_JOIN_TIMEOUT_SECS) for _ in range(expected)] + for p in self._procs: + p.join(timeout=_JOIN_TIMEOUT_SECS) + elapsed = time.monotonic() - start + + for p in self._procs: + self.assertFalse( + p.is_alive(), + 'worker still alive after join; results pipe likely saturated', + ) + self.assertEqual(p.exitcode, 0) + self.assertEqual(len(collected), expected) + # The drain should return well under its own timeout; we only assert a + # loose upper bound to avoid flakiness on slow machines. + self.assertLess(elapsed, _JOIN_TIMEOUT_SECS)