fix(web): handle BrokenPipeError in SessionProcess.send_message#2324
fix(web): handle BrokenPipeError in SessionProcess.send_message#2324Ricardo-M-L wants to merge 1 commit into
Conversation
`SessionProcess.send_message` writes to `process.stdin` and awaits `drain()` without guarding against the subprocess having exited between the `start()` call and the actual write. In a normal lifecycle `_read_loop` observes the exit and emits a "stopped" / "crashed" status, but there is a window where the caller (FastAPI/websocket handler) will otherwise see a raw `BrokenPipeError` propagate out of `send_message` — crashing the request and leaving any attached websocket clients with no diagnostic. Wrap the write+drain pair in a try/except for `BrokenPipeError` / `ConnectionResetError`, log a warning with the process returncode, and emit an "error" status with reason="stdin_broken" so subscribers learn about the failure synchronously instead of via the eventual read-loop exit message. No behavior change on the happy path.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 07c10f9dcf
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| await self._emit_status( | ||
| "error", | ||
| reason="stdin_broken", | ||
| detail=f"{e.__class__.__name__}: {e}", | ||
| ) |
There was a problem hiding this comment.
Clear in-flight prompts before emitting stdin_broken
When the broken stdin write is for a JSONRPCPromptMessage, the prompt id was already added to _in_flight_prompt_ids, but this new error path emits the stdin_broken status without clearing it. That makes the session report an error while is_busy remains true, so clients reacting immediately to the status can still be rejected by paths such as get_editable_session()'s busy check until _read_loop later catches up; the existing EOF/error paths explicitly clear in-flight ids before broadcasting for this reason. Clear _in_flight_prompt_ids before this _emit_status call.
Useful? React with 👍 / 👎.
| except (BrokenPipeError, ConnectionResetError) as e: | ||
| # Subprocess died between our `start()` check above and the actual write. | ||
| # `_read_loop` will eventually observe the exit and emit "stopped" / | ||
| # "crashed", but right now the caller (FastAPI / websocket handler) would | ||
| # otherwise see a raw exception propagate to the response. Emit an error | ||
| # status so any attached websocket clients see the failure synchronously. | ||
| logger.warning( | ||
| f"send_message: subprocess stdin {e.__class__.__name__}; " | ||
| f"process likely exited (returncode={process.returncode})" | ||
| ) | ||
| await self._emit_status( | ||
| "error", | ||
| reason="stdin_broken", | ||
| detail=f"{e.__class__.__name__}: {e}", | ||
| ) |
There was a problem hiding this comment.
🟡 In-flight prompt ID not cleaned up on BrokenPipeError leaves session stuck in 'busy' state
When send_message handles a JSONRPCPromptMessage, it adds the prompt ID to _in_flight_prompt_ids at line 647 and emits a "busy" status at line 649 before the stdin write attempt. If the write then fails with BrokenPipeError, the exception handler (lines 667-681) emits an "error" status but never removes the prompt ID from _in_flight_prompt_ids. This leaves is_busy returning True even though the prompt was never delivered to the subprocess.
The comment says _read_loop will eventually clean up, but there's a race window where _read_loop may have already completed its EOF handling (and called clear()) before the ID was added at line 647. In that case, no one clears the orphaned ID until the next start() call or the error-state recovery path in sessions.py:1011-1016. During this window, the session is simultaneously in "error" and "busy" states, and non-prompt messages (like cancel) that check is_busy will behave incorrectly.
| except (BrokenPipeError, ConnectionResetError) as e: | |
| # Subprocess died between our `start()` check above and the actual write. | |
| # `_read_loop` will eventually observe the exit and emit "stopped" / | |
| # "crashed", but right now the caller (FastAPI / websocket handler) would | |
| # otherwise see a raw exception propagate to the response. Emit an error | |
| # status so any attached websocket clients see the failure synchronously. | |
| logger.warning( | |
| f"send_message: subprocess stdin {e.__class__.__name__}; " | |
| f"process likely exited (returncode={process.returncode})" | |
| ) | |
| await self._emit_status( | |
| "error", | |
| reason="stdin_broken", | |
| detail=f"{e.__class__.__name__}: {e}", | |
| ) | |
| except (BrokenPipeError, ConnectionResetError) as e: | |
| # Subprocess died between our `start()` check above and the actual write. | |
| # `_read_loop` will eventually observe the exit and emit "stopped" / | |
| # "crashed", but right now the caller (FastAPI / websocket handler) would | |
| # otherwise see a raw exception propagate to the response. Emit an error | |
| # status so any attached websocket clients see the failure synchronously. | |
| logger.warning( | |
| f"send_message: subprocess stdin {e.__class__.__name__}; " | |
| f"process likely exited (returncode={process.returncode})" | |
| ) | |
| self._in_flight_prompt_ids.clear() | |
| await self._emit_status( | |
| "error", | |
| reason="stdin_broken", | |
| detail=f"{e.__class__.__name__}: {e}", | |
| ) |
Was this helpful? React with 👍 or 👎 to provide feedback.
What does this PR do?
SessionProcess.send_messageinsrc/kimi_cli/web/runner/process.pywrites toprocess.stdinand awaitsdrain()without guarding against the subprocess having exited between thestart()call at the top of the method and the actual write at the bottom:In a normal lifecycle
_read_loopobserves the exit and emits a"stopped"/"crashed"status. But there is a window — subprocess crashes, hits OOM, or is killed by an external signal — wheresend_message()reaches the write before_read_loopruns. The result is a rawBrokenPipeError(orConnectionResetErroron some platforms) propagating up to the FastAPI / websocket handler, surfacing as a 500 to the client with noerrorstatus emitted to any attached websocket subscribers.Fix
Wrap the write + drain pair in a try/except for
BrokenPipeErrorandConnectionResetError, log a warning withprocess.returncode, and emit an"error"status withreason="stdin_broken"so subscribers see the failure synchronously — instead of waiting on_read_loopto eventually emit a different terminal status.No behavior change on the happy path. 14-line addition inside the existing function.
Repro
In a long-running web session, manually
kill -9the subprocess (or trigger an OOM in it) and immediately send a message. Without this PR, the nextsend_message()call raises and the websocket clients see only_read_loop's eventual status; the API call itself crashes with a 500.Test plan
python3.11 -c "import ast; ast.parse(open('.../process.py').read())"— passestry