-
Notifications
You must be signed in to change notification settings - Fork 20
Fix parallel-coordinator hang when summary pipe saturates #248
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
4a8a452
c8df5bc
c0cb696
dfc9114
3a50d5a
fe8ac76
5f512fa
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -941,44 +941,60 @@ def on_timeout(): | |||||||||||||
|
|
||||||||||||||
| self.takeEnvDown(fullShutDown=True) | ||||||||||||||
|
|
||||||||||||||
| def run_jobs(jobs, results, summary, port): | ||||||||||||||
| def run_jobs(jobs, results, port): | ||||||||||||||
| Defaults.port = port | ||||||||||||||
| done = 0 | ||||||||||||||
| while True: | ||||||||||||||
| try: | ||||||||||||||
| test = jobs.get(timeout=0.1) | ||||||||||||||
| except Exception as e: | ||||||||||||||
| break | ||||||||||||||
|
|
||||||||||||||
| # Reset per-test: addFailure() in this worker writes into this | ||||||||||||||
| # dict, which is shipped as-is. The coordinator owns the | ||||||||||||||
| # cumulative testsFailed; the worker keeps no state across tests. | ||||||||||||||
| self.testsFailed = {} | ||||||||||||||
| output = io.StringIO() | ||||||||||||||
| with redirect_stdout(output): | ||||||||||||||
| def on_timeout(): | ||||||||||||||
| nonlocal done | ||||||||||||||
| try: | ||||||||||||||
| done += 1 | ||||||||||||||
| self.killEnvWithSegFault() | ||||||||||||||
| self.handleFailure(testFullName=test.name, testname=test.name, error_msg=Colors.Bred('Test timeout')) | ||||||||||||||
| except Exception as e: | ||||||||||||||
| self.handleFailure(testFullName=test.name, testname=test.name, error_msg=Colors.Bred('Exception on timeout function %s' % str(e))) | ||||||||||||||
| finally: | ||||||||||||||
| results.put({'test_name': test.name, "output": output.getvalue()}, block=False) | ||||||||||||||
| summary.put({'done': done, 'failures': self.testsFailed}, block=False) | ||||||||||||||
| # After we return the processes will be killed, so we must make sure the queues are drained properly. | ||||||||||||||
| # The watcher thread calls os._exit(1) right after | ||||||||||||||
| # this returns, bypassing Python finalization and | ||||||||||||||
| # the normal post-loop shutdown put. Ship both the | ||||||||||||||
| # per-test result and the shutdown sentinel here so | ||||||||||||||
| # the coordinator's bounded count of | ||||||||||||||
| # n_jobs + parallelism remains accurate. close() + | ||||||||||||||
| # join_thread() flushes both puts to the pipe first. | ||||||||||||||
| results.put({'test_name': test.name, 'output': output.getvalue(), | ||||||||||||||
| 'done': 1, 'failures': self.testsFailed, | ||||||||||||||
| 'shutdown': False}, block=False) | ||||||||||||||
| results.put({'test_name': '<worker shutdown>', 'output': '', | ||||||||||||||
| 'done': 0, 'failures': {}, | ||||||||||||||
| 'shutdown': True}, block=False) | ||||||||||||||
| results.close() | ||||||||||||||
| summary.close() | ||||||||||||||
| summary.join_thread() | ||||||||||||||
| results.join_thread() | ||||||||||||||
| done += self.run_single_test(test, on_timeout) | ||||||||||||||
|
|
||||||||||||||
| results.put({'test_name': test.name, "output": output.getvalue()}, block=False) | ||||||||||||||
| done_delta = self.run_single_test(test, on_timeout) | ||||||||||||||
|
|
||||||||||||||
| self.takeEnvDown(fullShutDown=True) | ||||||||||||||
| results.put({'test_name': test.name, 'output': output.getvalue(), | ||||||||||||||
| 'done': done_delta, 'failures': self.testsFailed, | ||||||||||||||
| 'shutdown': False}, block=False) | ||||||||||||||
|
|
||||||||||||||
| # serialized the results back | ||||||||||||||
| summary.put({'done': done, 'failures': self.testsFailed}, block=False) | ||||||||||||||
| # Always ship one shutdown message per worker so the coordinator | ||||||||||||||
| # reads a known total of n_jobs + parallelism messages. Captures | ||||||||||||||
| # failures raised during final shutdown (e.g. "redis did not exit | ||||||||||||||
| # cleanly" when env_reuse=True). | ||||||||||||||
| self.testsFailed = {} | ||||||||||||||
| self.takeEnvDown(fullShutDown=True) | ||||||||||||||
| results.put({'test_name': '<worker shutdown>', 'output': '', | ||||||||||||||
| 'done': 0, 'failures': self.testsFailed, | ||||||||||||||
| 'shutdown': True}, block=False) | ||||||||||||||
|
|
||||||||||||||
| results = Queue() | ||||||||||||||
| summary = Queue() | ||||||||||||||
| # Open group for all tests at the start (parallel execution) | ||||||||||||||
| self._openGitHubActionsTestsGroup() | ||||||||||||||
| if self.parallelism == 1: | ||||||||||||||
|
|
@@ -987,39 +1003,40 @@ def on_timeout(): | |||||||||||||
| processes = [] | ||||||||||||||
| currPort = Defaults.port | ||||||||||||||
| for i in range(self.parallelism): | ||||||||||||||
| p = Process(target=run_jobs, args=(jobs,results,summary,currPort)) | ||||||||||||||
| p = Process(target=run_jobs, args=(jobs,results,currPort)) | ||||||||||||||
| currPort += 30 # safe distance for cluster and replicas | ||||||||||||||
| processes.append(p) | ||||||||||||||
| p.start() | ||||||||||||||
| for _ in self.progressbar(n_jobs): | ||||||||||||||
| # Workers send exactly n_jobs per-test messages plus one shutdown | ||||||||||||||
| # message each, for a known total. The single shared queue does | ||||||||||||||
| # not preserve per-worker ordering, so a fast worker's shutdown | ||||||||||||||
| # may arrive before a slow worker's last test. We read every | ||||||||||||||
| # message in one bounded loop and tick the progressbar only on | ||||||||||||||
| # per-test ones. The has_live_processor guard turns a worker | ||||||||||||||
| # crash before it ships its shutdown message into a clean error | ||||||||||||||
| # instead of an indefinite hang. | ||||||||||||||
| def _get_result(): | ||||||||||||||
| while True: | ||||||||||||||
| # check if we have some lives executors | ||||||||||||||
| has_live_processor = False | ||||||||||||||
| for p in processes: | ||||||||||||||
| if p.is_alive(): | ||||||||||||||
| has_live_processor = True | ||||||||||||||
| break | ||||||||||||||
| try: | ||||||||||||||
| res = results.get(timeout=1) | ||||||||||||||
| break | ||||||||||||||
| except Exception as e: | ||||||||||||||
| if not has_live_processor: | ||||||||||||||
| raise Exception('Failed to get job result and no more processors is alive') | ||||||||||||||
| output = res['output'] | ||||||||||||||
| print('%s' % output, end="") | ||||||||||||||
| return results.get(timeout=1) | ||||||||||||||
| except Exception: | ||||||||||||||
| if not any(p.is_alive() for p in processes): | ||||||||||||||
| raise Exception('Failed to get job result and no more processors are alive') | ||||||||||||||
|
GuyAv46 marked this conversation as resolved.
|
||||||||||||||
|
|
||||||||||||||
| bar_iter = iter(self.progressbar(n_jobs)) | ||||||||||||||
| for _ in range(n_jobs + self.parallelism): | ||||||||||||||
| res = _get_result() | ||||||||||||||
| if res['output']: | ||||||||||||||
| print('%s' % res['output'], end="") | ||||||||||||||
| done += res['done'] | ||||||||||||||
| self.testsFailed.update(res['failures']) | ||||||||||||||
|
||||||||||||||
| self.testsFailed.update(res['failures']) | |
| for test_name, failures in res['failures'].items(): | |
| if test_name not in self.testsFailed: | |
| self.testsFailed[test_name] = list(failures) | |
| else: | |
| self.testsFailed[test_name].extend(failures) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only relevant for reused envs, will be handled in the future if needed
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,86 @@ | ||
| """Regression test for a hang in the parallel test coordinator. | ||
|
|
||
| The parallel coordinator uses a single ``results`` queue carrying one message | ||
| per test. Workers push these messages while the coordinator drains them in its | ||
| progressbar loop. If the coordinator ever stops draining before workers stop | ||
| pushing, large per-test outputs saturate the pipe (~64 KiB on Linux), worker | ||
| ``put()`` calls block in ``pipe_write``, and ``p.join()`` hangs indefinitely. | ||
|
|
||
| This test reproduces the saturation scenario by spawning workers that each | ||
| push many large messages, then asserts that a coordinator that drains | ||
| continuously throughout the workers' lifetime finishes promptly with every | ||
| message accounted for and every worker cleanly exited. | ||
| """ | ||
|
|
||
| import multiprocessing as mp | ||
| import sys | ||
| import time | ||
| from unittest import TestCase | ||
|
|
||
|
|
||
| # ~32 KiB per message × 8 workers × 8 messages = 2 MiB total, well over the | ||
| # typical 64 KiB pipe buffer on Linux, so writers will block on ``pipe_write`` | ||
| # unless the parent is actively reading throughout. | ||
| _PAYLOAD_BYTES = 32 * 1024 | ||
| _NUM_WORKERS = 8 | ||
| _MSGS_PER_WORKER = 8 | ||
| _JOIN_TIMEOUT_SECS = 30.0 | ||
|
|
||
|
|
||
| def _worker_puts_many_results(results, n_msgs, payload_bytes): | ||
| # Queue.put is async (via a feeder thread), but at process exit the feeder | ||
| # must flush the buffered items to the pipe before the worker can exit. If | ||
| # the parent is not draining, that flush blocks forever. | ||
| payload = 'x' * payload_bytes | ||
| for i in range(n_msgs): | ||
| results.put({'test_name': 't%d' % i, 'output': payload, | ||
| 'done': 1, 'failures': {}}) | ||
|
|
||
|
|
||
| class TestParallelResultsDrain(TestCase): | ||
|
|
||
| def setUp(self): | ||
| if sys.platform == 'win32': | ||
| self.skipTest('fork start method is unavailable on Windows') | ||
| self._ctx = mp.get_context('fork') | ||
| self._procs = [] | ||
|
|
||
| def tearDown(self): | ||
| # Safety net: if the test ever hangs despite the fix, make sure the | ||
| # pytest session can still exit cleanly. | ||
| for p in self._procs: | ||
| if p.is_alive(): | ||
| p.kill() | ||
| p.join(timeout=5) | ||
|
|
||
| def test_continuous_drain_does_not_hang(self): | ||
| results = self._ctx.Queue() | ||
| self._procs = [ | ||
| self._ctx.Process( | ||
| target=_worker_puts_many_results, | ||
| args=(results, _MSGS_PER_WORKER, _PAYLOAD_BYTES), | ||
| ) | ||
| for _ in range(_NUM_WORKERS) | ||
| ] | ||
| for p in self._procs: | ||
| p.start() | ||
|
|
||
| # Mimic the coordinator's progressbar loop: drain every per-test | ||
| # message live, in the same thread, while workers are still running. | ||
| expected = _NUM_WORKERS * _MSGS_PER_WORKER | ||
| start = time.monotonic() | ||
| collected = [results.get(timeout=_JOIN_TIMEOUT_SECS) for _ in range(expected)] | ||
| for p in self._procs: | ||
| p.join(timeout=_JOIN_TIMEOUT_SECS) | ||
| elapsed = time.monotonic() - start | ||
|
|
||
| for p in self._procs: | ||
| self.assertFalse( | ||
| p.is_alive(), | ||
| 'worker still alive after join; results pipe likely saturated', | ||
| ) | ||
| self.assertEqual(p.exitcode, 0) | ||
| self.assertEqual(len(collected), expected) | ||
| # The drain should return well under its own timeout; we only assert a | ||
| # loose upper bound to avoid flakiness on slow machines. | ||
| self.assertLess(elapsed, _JOIN_TIMEOUT_SECS) |
Uh oh!
There was an error while loading. Please reload this page.