From bfe38ac0c7c070654da1d0d1a9e6e96f9d53e8d0 Mon Sep 17 00:00:00 2001 From: nightjoker7 Date: Wed, 22 Apr 2026 19:08:32 -0500 Subject: [PATCH] StreamInterface: prevent socket/reader-thread leak on handshake failure in __init__ MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit If connect() or waitForConfig() raises during __init__ (handshake timeout, bad stream, config error), the reader thread started by connect() keeps running and the underlying stream/socket stays open — but the caller never receives a reference to the half-initialized instance, so they cannot call close() themselves. The leak compounds on every retry from a caller's reconnect loop. Fix: wrap connect() + waitForConfig() in try/except; call self.close() on any exception before re-raising. Also guard close() against RuntimeError from joining an unstarted reader thread (happens when close() runs from a failed __init__ before connect() could spawn it). Discovered while debugging a real-world Meshtastic firmware crash where a passive logger's retrying TCPInterface() calls against a node with 250-entry NodeDB produced a reconnect storm — every retry triggered a full config+NodeDB dump on the node, compounding heap pressure, which then exposed null-deref bugs in Router::perhapsDecode / MeshService (firmware side fixed in meshtastic/firmware#10226 and #10229). The client-side leak is independent of those firmware bugs and worth fixing on its own. --- meshtastic/stream_interface.py | 23 +++++- meshtastic/tests/test_stream_interface.py | 95 +++++++++++++++++++++++ 2 files changed, 114 insertions(+), 4 deletions(-) diff --git a/meshtastic/stream_interface.py b/meshtastic/stream_interface.py index 06ee28a3a..06791ffe5 100644 --- a/meshtastic/stream_interface.py +++ b/meshtastic/stream_interface.py @@ -1,5 +1,6 @@ """Stream Interface base class """ +import contextlib import io import logging import threading @@ -61,9 +62,17 @@ def __init__( # pylint: disable=R0917 # Start the reader thread after superclass constructor completes init if connectNow: - self.connect() - if not noProto: - self.waitForConfig() + try: + self.connect() + if not noProto: + self.waitForConfig() + except Exception: + # If the handshake raises, the caller never receives a reference + # to this instance and cannot call close() themselves. Clean up + # the reader thread + stream here so retries don't leak. + with contextlib.suppress(Exception): + self.close() + raise def connect(self) -> None: """Connect to our radio @@ -136,7 +145,13 @@ def close(self) -> None: # reader thread to close things for us self._wantExit = True if self._rxThread != threading.current_thread(): - self._rxThread.join() # wait for it to exit + try: + self._rxThread.join() # wait for it to exit + except RuntimeError: + # Thread was never started — happens when close() is invoked + # from a failed __init__ before connect() could spawn it. + # Nothing to join; safe to ignore. + pass def _handleLogByte(self, b): """Handle a byte that is part of a log message from the device.""" diff --git a/meshtastic/tests/test_stream_interface.py b/meshtastic/tests/test_stream_interface.py index 3411ffb81..db09f44d8 100644 --- a/meshtastic/tests/test_stream_interface.py +++ b/meshtastic/tests/test_stream_interface.py @@ -18,6 +18,101 @@ def test_StreamInterface(): assert pytest_wrapped_e.type == Exception +@pytest.mark.unit +@pytest.mark.usefixtures("reset_mt_config") +def test_StreamInterface_close_safe_when_thread_never_started(): + """close() must not raise RuntimeError when called before connect() has started the reader. + + Hits the cleanup path used by __init__ when the handshake raises before the + reader thread is started. + """ + iface = StreamInterface(noProto=True, connectNow=False) + iface.stream = MagicMock() + # _rxThread was created in __init__ but never .start()'d. close() should + # detect that and skip join() instead of raising RuntimeError. + iface.close() + + +@pytest.mark.unit +@pytest.mark.usefixtures("reset_mt_config") +def test_StreamInterface_init_cleans_up_when_connect_raises(): + """If connect() raises during __init__, close() runs and the original exception propagates.""" + + cleanup_calls = [] + + class FailingConnectStream(StreamInterface): + """Subclass whose connect() raises, to exercise the __init__ cleanup path.""" + + def __init__(self): + self.stream = MagicMock() # bypass StreamInterface abstract check + super().__init__(noProto=False, connectNow=True) + + def connect(self): + raise RuntimeError("simulated handshake failure") + + def close(self): + cleanup_calls.append("close") + super().close() + + with pytest.raises(RuntimeError, match="simulated handshake failure"): + FailingConnectStream() + assert cleanup_calls == ["close"], "close() should be invoked exactly once on handshake failure" + + +@pytest.mark.unit +@pytest.mark.usefixtures("reset_mt_config") +def test_StreamInterface_init_cleans_up_when_waitForConfig_raises(): + """If waitForConfig() raises after a successful connect(), close() runs and exception propagates.""" + + cleanup_calls = [] + + class FailingWaitStream(StreamInterface): + """Subclass whose waitForConfig() raises, to exercise the second leg of cleanup.""" + + def __init__(self): + self.stream = MagicMock() + super().__init__(noProto=False, connectNow=True) + + def connect(self): + # No-op connect — we are simulating handshake-stage failure, not connect-stage. + pass + + def waitForConfig(self): + raise TimeoutError("simulated config-handshake timeout") + + def close(self): + cleanup_calls.append("close") + super().close() + + with pytest.raises(TimeoutError, match="simulated config-handshake timeout"): + FailingWaitStream() + assert cleanup_calls == ["close"], "close() should be invoked exactly once on handshake timeout" + + +@pytest.mark.unit +@pytest.mark.usefixtures("reset_mt_config") +def test_StreamInterface_init_cleanup_does_not_shadow_original_exception(): + """If close() itself raises during __init__ cleanup, the original exception still propagates. + + The cleanup uses contextlib.suppress(Exception) so that a secondary failure + in close() doesn't replace the real reason for the failed handshake. + """ + + class CleanupRaisesStream(StreamInterface): + def __init__(self): + self.stream = MagicMock() + super().__init__(noProto=False, connectNow=True) + + def connect(self): + raise RuntimeError("original handshake failure") + + def close(self): + raise RuntimeError("secondary close failure — should be suppressed") + + with pytest.raises(RuntimeError, match="original handshake failure"): + CleanupRaisesStream() + + # Note: This takes a bit, so moving from unit to slow @pytest.mark.unitslow @pytest.mark.usefixtures("reset_mt_config")