Skip to content

fix(web): handle BrokenPipeError in SessionProcess.send_message#2324

Open
Ricardo-M-L wants to merge 1 commit into
MoonshotAI:mainfrom
Ricardo-M-L:contrib/fix-send-message-broken-pipe
Open

fix(web): handle BrokenPipeError in SessionProcess.send_message#2324
Ricardo-M-L wants to merge 1 commit into
MoonshotAI:mainfrom
Ricardo-M-L:contrib/fix-send-message-broken-pipe

Conversation

@Ricardo-M-L
Copy link
Copy Markdown

@Ricardo-M-L Ricardo-M-L commented May 19, 2026

What does this PR do?

SessionProcess.send_message in src/kimi_cli/web/runner/process.py writes to process.stdin and awaits drain() without guarding against the subprocess having exited between the start() call at the top of the method and the actual write at the bottom:

process.stdin.write((message + "\n").encode("utf-8"))
await process.stdin.drain()

In a normal lifecycle _read_loop observes the exit and emits a "stopped" / "crashed" status. But there is a window — subprocess crashes, hits OOM, or is killed by an external signal — where send_message() reaches the write before _read_loop runs. The result is a raw BrokenPipeError (or ConnectionResetError on some platforms) propagating up to the FastAPI / websocket handler, surfacing as a 500 to the client with no error status emitted to any attached websocket subscribers.

Fix

Wrap the write + drain pair in a try/except for BrokenPipeError and ConnectionResetError, log a warning with process.returncode, and emit an "error" status with reason="stdin_broken" so subscribers see the failure synchronously — instead of waiting on _read_loop to eventually emit a different terminal status.

No behavior change on the happy path. 14-line addition inside the existing function.

Repro

In a long-running web session, manually kill -9 the subprocess (or trigger an OOM in it) and immediately send a message. Without this PR, the next send_message() call raises and the websocket clients see only _read_loop's eventual status; the API call itself crashes with a 500.

Test plan

  • python3.11 -c "import ast; ast.parse(open('.../process.py').read())" — passes
  • Manual code review for happy-path equivalence — the original two-line write/drain is preserved unchanged inside the try

Open in Devin Review

`SessionProcess.send_message` writes to `process.stdin` and awaits
`drain()` without guarding against the subprocess having exited between
the `start()` call and the actual write. In a normal lifecycle
`_read_loop` observes the exit and emits a "stopped" / "crashed" status,
but there is a window where the caller (FastAPI/websocket handler) will
otherwise see a raw `BrokenPipeError` propagate out of `send_message` —
crashing the request and leaving any attached websocket clients with no
diagnostic.

Wrap the write+drain pair in a try/except for `BrokenPipeError` /
`ConnectionResetError`, log a warning with the process returncode, and
emit an "error" status with reason="stdin_broken" so subscribers learn
about the failure synchronously instead of via the eventual read-loop
exit message.

No behavior change on the happy path.
Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 07c10f9dcf

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Comment on lines +677 to +681
await self._emit_status(
"error",
reason="stdin_broken",
detail=f"{e.__class__.__name__}: {e}",
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Clear in-flight prompts before emitting stdin_broken

When the broken stdin write is for a JSONRPCPromptMessage, the prompt id was already added to _in_flight_prompt_ids, but this new error path emits the stdin_broken status without clearing it. That makes the session report an error while is_busy remains true, so clients reacting immediately to the status can still be rejected by paths such as get_editable_session()'s busy check until _read_loop later catches up; the existing EOF/error paths explicitly clear in-flight ids before broadcasting for this reason. Clear _in_flight_prompt_ids before this _emit_status call.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Contributor

@devin-ai-integration devin-ai-integration Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Devin Review found 1 potential issue.

View 3 additional findings in Devin Review.

Open in Devin Review

Comment on lines +667 to +681
except (BrokenPipeError, ConnectionResetError) as e:
# Subprocess died between our `start()` check above and the actual write.
# `_read_loop` will eventually observe the exit and emit "stopped" /
# "crashed", but right now the caller (FastAPI / websocket handler) would
# otherwise see a raw exception propagate to the response. Emit an error
# status so any attached websocket clients see the failure synchronously.
logger.warning(
f"send_message: subprocess stdin {e.__class__.__name__}; "
f"process likely exited (returncode={process.returncode})"
)
await self._emit_status(
"error",
reason="stdin_broken",
detail=f"{e.__class__.__name__}: {e}",
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 In-flight prompt ID not cleaned up on BrokenPipeError leaves session stuck in 'busy' state

When send_message handles a JSONRPCPromptMessage, it adds the prompt ID to _in_flight_prompt_ids at line 647 and emits a "busy" status at line 649 before the stdin write attempt. If the write then fails with BrokenPipeError, the exception handler (lines 667-681) emits an "error" status but never removes the prompt ID from _in_flight_prompt_ids. This leaves is_busy returning True even though the prompt was never delivered to the subprocess.

The comment says _read_loop will eventually clean up, but there's a race window where _read_loop may have already completed its EOF handling (and called clear()) before the ID was added at line 647. In that case, no one clears the orphaned ID until the next start() call or the error-state recovery path in sessions.py:1011-1016. During this window, the session is simultaneously in "error" and "busy" states, and non-prompt messages (like cancel) that check is_busy will behave incorrectly.

Suggested change
except (BrokenPipeError, ConnectionResetError) as e:
# Subprocess died between our `start()` check above and the actual write.
# `_read_loop` will eventually observe the exit and emit "stopped" /
# "crashed", but right now the caller (FastAPI / websocket handler) would
# otherwise see a raw exception propagate to the response. Emit an error
# status so any attached websocket clients see the failure synchronously.
logger.warning(
f"send_message: subprocess stdin {e.__class__.__name__}; "
f"process likely exited (returncode={process.returncode})"
)
await self._emit_status(
"error",
reason="stdin_broken",
detail=f"{e.__class__.__name__}: {e}",
)
except (BrokenPipeError, ConnectionResetError) as e:
# Subprocess died between our `start()` check above and the actual write.
# `_read_loop` will eventually observe the exit and emit "stopped" /
# "crashed", but right now the caller (FastAPI / websocket handler) would
# otherwise see a raw exception propagate to the response. Emit an error
# status so any attached websocket clients see the failure synchronously.
logger.warning(
f"send_message: subprocess stdin {e.__class__.__name__}; "
f"process likely exited (returncode={process.returncode})"
)
self._in_flight_prompt_ids.clear()
await self._emit_status(
"error",
reason="stdin_broken",
detail=f"{e.__class__.__name__}: {e}",
)
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant