Feat/events socket#352
Conversation
Adds the Event enum and RemovedReason for the push-event socket. Wire format is JSONL: snapshot reuses shpool_protocol::Session so the schema matches `shpool list --json`; deltas are flat objects tagged with `type`. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The daemon binds a sibling Unix socket (events.socket) next to the main shpool socket and accepts long-lived subscribers. Each new subscriber receives a snapshot of the session table as its first message, built under the shells lock so subsequent deltas (published in a follow-up change) cannot race the registration. Per-subscriber writer threads with bounded channels and a write timeout isolate slow or stuck consumers from the daemon's hot path; subscribers that fall behind are dropped and re-sync by reconnecting. Extracts collect_sessions from handle_list so the snapshot and the existing `shpool list --json` output share one schema-producing path. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Emits session.created, session.attached, session.detached, and
session.removed (with reason exited|killed) at the seven mutation
sites that change the session table:
* select_shell_desc create path: created + attached
* select_shell_desc reattach path: attached
* handle_attach client-disconnect path: detached
* handle_attach shell-exit path: removed{exited}
* handle_detach: detached
* handle_kill: removed{killed}
* ttl_reaper expiry: removed{killed}
Each publish runs inside the same shells-lock scope as its mutation,
so wire-order matches causal-order for any subscriber. Reaping is
surfaced as `killed` for now; a dedicated reason can be added later
if a use case appears.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Connects to the daemon's events socket and prints each JSON line to stdout, flushing per line so the stream is pipeline-friendly: shpool events | jq 'select(.type == "session.removed")' The first line is a snapshot of the current session table; subsequent lines are deltas. Reconnect to force a fresh snapshot. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Three end-to-end tests, exercising the full daemon → events socket →
JSON wire path:
* snapshot_then_lifecycle: snapshot, then session.created /
.attached / .detached / .removed{killed} as a session is created,
detached (via background mode), and killed.
* snapshot_includes_existing_sessions: a subscriber that connects
after a session already exists receives that session in the
snapshot.
* multiple_subscribers_each_get_independent_streams: two concurrent
subscribers both receive the snapshot and full delta sequence.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- Gate the SessionRemoved{Exited} publish in handle_attach's shell-exit
branch on shells.remove() actually returning Some, so a concurrent
kill or reaper that already removed the entry doesn't produce a
duplicate removal event.
- Drop the eager SessionDetached publish in handle_detach. The
bidi-loop unwind path in handle_attach already publishes the matching
event with its own timestamp; emitting it twice was observable to
subscribers. Keep the eager last_disconnected_at write so concurrent
list() callers still see fresh state immediately.
- In select_shell_desc, defer the reattach SessionAttached publish past
the is_finished() check so it isn't emitted for a session about to be
implicitly clobbered by the create path. Have the create path publish
SessionRemoved{Exited} when it overwrites an existing entry, so the
replacement is explicit on the wire.
- Thread the events socket path through signals::Handler so signal
exits clean it up alongside the main socket. Switch to a Vec<PathBuf>
to handle both. Tolerate NotFound on cleanup since the events socket
may not have been bound yet when a signal arrives.
- Drop the unnecessary `pub` on Server.events_bus.
- Apply nightly rustfmt.
Three new integration tests:
- explicit_detach_publishes_one_event: pins the no-duplicate-detached
invariant by using a kill as a known-next-event fence; a duplicate
detached would surface as the next read instead of session.removed.
- signal_exit_unlinks_sockets: SIGTERM the daemon and assert both
socket files are gone.
- reattach_emits_attached_only: regression guard for the reattach path.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
Thanks for your pull request! It looks like this may be your first contribution to a Google open source project. Before we can look at your pull request, you'll need to sign a Contributor License Agreement (CLA). View this failed invocation of the CLA check for more information. For the most up to date status, view the checks section at the bottom of the pull request. |
|
I really appreciate your enthusiam, but in the future, let's try to do more up-front design before jumping into coding. #346 (comment) explains some more of my thoughts on the matter. Rather than jumping straight to code, can you first post the design you intend to code to, with a particular focus on the contract and API that we'll offer. Basically, I think we should work out and agree on what the developer-facing documentation will look like (doesn't have to be polished, but we should have a precise spec that a developer for a tui could code against and agree on it). Normally and issue would be the place to do that, but we can just do it in this thread and then you can update the PR contents once we've settled on something. Some feedback on what I see in the PR description:
|
|
Re less rich events, how about just the One goal of the current design was to make it so that an external program can, solely by watching the events socket, maintain a consistent view of the shpool state. This required the snapshot-upon-subscription event (for synchronization) and some metadata (like So, if we simplify the events, the events stream is no longer sufficient, so it might as well be simplified further to no longer fire any -upon-subscription event (which also means events.rs doesn't need to Re your Another benefit of |
|
Yeah, keeping an Event enum seems like a good idea. I think dropping the snapshot and the extra fields besides type and name seem pretty good. In terms of hiding the socket location, I still think we should document the protocol and socket location. I do think that heavier duty applications should probably be just directly dialing the socket rather than forking a sub-proc, so we should be intentional about making that part of the public API (people will just do it anyway and we'll be on the hook for compatibility so we might as well be intentional). I think the way you have it with as sidecar events.socket file. |
|
Maybe we could even get away with dropping name as well. Why do you need to be able to track which specific session changed? |
|
I don't need the I think the external documentation would just cover the event types and serde-derived JSONL format of the stream, accessible via Would the documentation live in a top-level EVENTS.md that README.md points to? |
|
Yeah, I think that would be a good place to put the documentation. We probably don't want a section inline in the README so a seperate .md file is a good way to go. That all sounds like a good plan. Do you want to update this PR with that plan in mind? Once you are done, just let me know and I'll review. |
Each event now serializes as `{"type":"session.<x>"}` with no other
fields. To learn what the event refers to, subscribers follow up with
`shpool list` (or send `ConnectHeader::List` over the main socket).
- Drop the welcome `snapshot` event; subscribers do their own bootstrap
list call after connecting.
- Drop `name`, timestamps, and `reason` from the lifecycle events.
- Remove `RemovedReason`; reaped/killed/exited share `session.removed`.
- Inline `collect_sessions` back into `handle_list` (the snapshot was
its only other consumer) and drop the now-unused `unix_ms` helper.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Cover the sibling-socket transport, the four event types, the JSONL wire format, the `shpool events` CLI helper plus direct-socket use for heavier-duty consumers, the ordering guarantee (publish under the session-table lock so wire-order matches causal-order), and the slow-subscriber drop policy. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
I've drafted EVENTS.md and stripped out the extra fields. It looks like the CLA check is unhappy about the |
Issue Link
#346 (comment)
AI Policy Ack
ack -- pair-programmed with Claude; every line was human-reviewed.
Description
Adds an events socket where the shpool daemon can push events to subscribers. This lets external programs like TUIs stay updated with the shpool state w/o needing to poll. The new events socket is located at
<runtime_dir>/shpool/events.socket, alongside the main socket.The format is JSONL (each event is a one-line JSON object). Events look like:
This PR also (tentatively) includes
shpool events(seefn subscribe_to_stdout), which is a QoL subcommand that connects to the events socket, prints each event as its received, and flushes after each print. This alleviates some boilerplate from downstream users and makes it easy to pipe the output to other programs.Each subscription spawns a writer thread with its own internal buffer and timeout. This is done to avoid blocking on slow event consumers.