Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 56 additions & 39 deletions RLTest/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -941,44 +941,60 @@ def on_timeout():

self.takeEnvDown(fullShutDown=True)

def run_jobs(jobs, results, summary, port):
def run_jobs(jobs, results, port):
Defaults.port = port
done = 0
while True:
try:
test = jobs.get(timeout=0.1)
except Exception as e:
break

# Reset per-test: addFailure() in this worker writes into this
# dict, which is shipped as-is. The coordinator owns the
# cumulative testsFailed; the worker keeps no state across tests.
self.testsFailed = {}
output = io.StringIO()
with redirect_stdout(output):
def on_timeout():
nonlocal done
try:
done += 1
self.killEnvWithSegFault()
self.handleFailure(testFullName=test.name, testname=test.name, error_msg=Colors.Bred('Test timeout'))
except Exception as e:
self.handleFailure(testFullName=test.name, testname=test.name, error_msg=Colors.Bred('Exception on timeout function %s' % str(e)))
finally:
results.put({'test_name': test.name, "output": output.getvalue()}, block=False)
summary.put({'done': done, 'failures': self.testsFailed}, block=False)
# After we return the processes will be killed, so we must make sure the queues are drained properly.
# The watcher thread calls os._exit(1) right after
# this returns, bypassing Python finalization and
# the normal post-loop shutdown put. Ship both the
# per-test result and the shutdown sentinel here so
# the coordinator's bounded count of
# n_jobs + parallelism remains accurate. close() +
# join_thread() flushes both puts to the pipe first.
results.put({'test_name': test.name, 'output': output.getvalue(),
'done': 1, 'failures': self.testsFailed,
'shutdown': False}, block=False)
results.put({'test_name': '<worker shutdown>', 'output': '',
'done': 0, 'failures': {},
'shutdown': True}, block=False)
results.close()
summary.close()
summary.join_thread()
results.join_thread()
done += self.run_single_test(test, on_timeout)

results.put({'test_name': test.name, "output": output.getvalue()}, block=False)
done_delta = self.run_single_test(test, on_timeout)

self.takeEnvDown(fullShutDown=True)
results.put({'test_name': test.name, 'output': output.getvalue(),
'done': done_delta, 'failures': self.testsFailed,
'shutdown': False}, block=False)

# serialized the results back
summary.put({'done': done, 'failures': self.testsFailed}, block=False)
# Always ship one shutdown message per worker so the coordinator
# reads a known total of n_jobs + parallelism messages. Captures
# failures raised during final shutdown (e.g. "redis did not exit
# cleanly" when env_reuse=True).
self.testsFailed = {}
self.takeEnvDown(fullShutDown=True)
results.put({'test_name': '<worker shutdown>', 'output': '',
'done': 0, 'failures': self.testsFailed,
'shutdown': True}, block=False)

results = Queue()
summary = Queue()
# Open group for all tests at the start (parallel execution)
Comment thread
GuyAv46 marked this conversation as resolved.
self._openGitHubActionsTestsGroup()
if self.parallelism == 1:
Expand All @@ -987,39 +1003,40 @@ def on_timeout():
processes = []
currPort = Defaults.port
for i in range(self.parallelism):
p = Process(target=run_jobs, args=(jobs,results,summary,currPort))
p = Process(target=run_jobs, args=(jobs,results,currPort))
currPort += 30 # safe distance for cluster and replicas
processes.append(p)
p.start()
for _ in self.progressbar(n_jobs):
# Workers send exactly n_jobs per-test messages plus one shutdown
# message each, for a known total. The single shared queue does
# not preserve per-worker ordering, so a fast worker's shutdown
# may arrive before a slow worker's last test. We read every
# message in one bounded loop and tick the progressbar only on
# per-test ones. The has_live_processor guard turns a worker
# crash before it ships its shutdown message into a clean error
# instead of an indefinite hang.
def _get_result():
while True:
# check if we have some lives executors
has_live_processor = False
for p in processes:
if p.is_alive():
has_live_processor = True
break
try:
res = results.get(timeout=1)
break
except Exception as e:
if not has_live_processor:
raise Exception('Failed to get job result and no more processors is alive')
output = res['output']
print('%s' % output, end="")
return results.get(timeout=1)
except Exception:
if not any(p.is_alive() for p in processes):
raise Exception('Failed to get job result and no more processors are alive')
Comment thread
GuyAv46 marked this conversation as resolved.

bar_iter = iter(self.progressbar(n_jobs))
for _ in range(n_jobs + self.parallelism):
res = _get_result()
if res['output']:
print('%s' % res['output'], end="")
done += res['done']
self.testsFailed.update(res['failures'])
Copy link

Copilot AI Apr 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self.testsFailed.update(res['failures']) can overwrite existing failure lists when the incoming dict contains a key that already exists (e.g., the shutdown message can carry failures keyed by self.currEnv.testName, which may match a test that already failed earlier). This risks losing earlier failure details in the final summary. Consider merging per-key lists (extend) instead of dict.update(), or ensure shutdown-phase failures are reported under a dedicated key that cannot collide with real test names.

Suggested change
self.testsFailed.update(res['failures'])
for test_name, failures in res['failures'].items():
if test_name not in self.testsFailed:
self.testsFailed[test_name] = list(failures)
else:
self.testsFailed[test_name].extend(failures)

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only relevant for reused envs, will be handled in the future if needed

if not res['shutdown']:
next(bar_iter, None)
next(bar_iter, None) # finalize bar.update(n_jobs)

for p in processes:
p.join()

# join results
while True:
try:
res = summary.get(timeout=1)
except Exception as e:
break
done += res['done']
self.testsFailed.update(res['failures'])

endTime = time.time()

# Close group after all tests complete (parallel execution)
Expand Down
86 changes: 86 additions & 0 deletions tests/unit/test_parallel_drain.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
"""Regression test for a hang in the parallel test coordinator.

The parallel coordinator uses a single ``results`` queue carrying one message
per test. Workers push these messages while the coordinator drains them in its
progressbar loop. If the coordinator ever stops draining before workers stop
pushing, large per-test outputs saturate the pipe (~64 KiB on Linux), worker
``put()`` calls block in ``pipe_write``, and ``p.join()`` hangs indefinitely.

This test reproduces the saturation scenario by spawning workers that each
push many large messages, then asserts that a coordinator that drains
continuously throughout the workers' lifetime finishes promptly with every
message accounted for and every worker cleanly exited.
"""

import multiprocessing as mp
import sys
import time
from unittest import TestCase


# ~32 KiB per message × 8 workers × 8 messages = 2 MiB total, well over the
# typical 64 KiB pipe buffer on Linux, so writers will block on ``pipe_write``
# unless the parent is actively reading throughout.
_PAYLOAD_BYTES = 32 * 1024
_NUM_WORKERS = 8
_MSGS_PER_WORKER = 8
_JOIN_TIMEOUT_SECS = 30.0


def _worker_puts_many_results(results, n_msgs, payload_bytes):
# Queue.put is async (via a feeder thread), but at process exit the feeder
# must flush the buffered items to the pipe before the worker can exit. If
# the parent is not draining, that flush blocks forever.
payload = 'x' * payload_bytes
for i in range(n_msgs):
results.put({'test_name': 't%d' % i, 'output': payload,
'done': 1, 'failures': {}})


class TestParallelResultsDrain(TestCase):

def setUp(self):
if sys.platform == 'win32':
self.skipTest('fork start method is unavailable on Windows')
self._ctx = mp.get_context('fork')
self._procs = []

def tearDown(self):
# Safety net: if the test ever hangs despite the fix, make sure the
# pytest session can still exit cleanly.
for p in self._procs:
if p.is_alive():
p.kill()
p.join(timeout=5)

def test_continuous_drain_does_not_hang(self):
results = self._ctx.Queue()
self._procs = [
self._ctx.Process(
target=_worker_puts_many_results,
args=(results, _MSGS_PER_WORKER, _PAYLOAD_BYTES),
)
for _ in range(_NUM_WORKERS)
]
for p in self._procs:
p.start()

# Mimic the coordinator's progressbar loop: drain every per-test
# message live, in the same thread, while workers are still running.
expected = _NUM_WORKERS * _MSGS_PER_WORKER
start = time.monotonic()
collected = [results.get(timeout=_JOIN_TIMEOUT_SECS) for _ in range(expected)]
for p in self._procs:
p.join(timeout=_JOIN_TIMEOUT_SECS)
elapsed = time.monotonic() - start

for p in self._procs:
self.assertFalse(
p.is_alive(),
'worker still alive after join; results pipe likely saturated',
)
self.assertEqual(p.exitcode, 0)
self.assertEqual(len(collected), expected)
# The drain should return well under its own timeout; we only assert a
# loose upper bound to avoid flakiness on slow machines.
self.assertLess(elapsed, _JOIN_TIMEOUT_SECS)
Loading