From a7f1cf163e3dbec0ae733bde1c14efe13f435835 Mon Sep 17 00:00:00 2001 From: Helge Tesdal Date: Wed, 29 Apr 2026 15:47:50 +0200 Subject: [PATCH 1/3] refactor(session): move run-events to session/auto-reply/ with sink interface MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit RunEvents was never CLI-specific — ACP and TUI headless/daemon will need equivalent auto-permission-reply logic. Moves to src/session/auto-reply/ with a Sink interface that decouples the event-emission concern from the auto-reply contract. jsonMode becomes a CLI-side concern in run.ts's makeRunSink() implementation. The new SessionAutoReply.make is behaviorally equivalent to the old RunEvents.make except for one extension: a new onLivelockWarn sink callback fires alongside the existing log.warn at the same gate so non-CLI sinks (ACP transport, TUI status bar, daemon telemetry) can surface the warning without grepping logs. The CLI sink in run.ts intentionally no-ops onLivelockWarn to preserve the pre-F11 stdout JSON contract (operators only saw auto-reject/auto-approve events). JSON shape stability is now pinned by test/cli/run-make-sink.test.ts, which exercises makeRunSink in isolation and asserts exact field names (autoRejectSessionID/totalAutoRejects/autoApproveSessionID/ totalAutoApproves/sessionID/timestamp/kind/type). Operator pipelines parsing this output keep working. The variable runEventsHandle and field hasRunEventsHandle in run.ts / dispatchPermissionAsked are intentionally left unrenamed: the names are now slightly misleading but renaming would ripple into 9 exported unit tests in run-attach-permission.test.ts. The F10 doc trail narrative paragraphs were updated to refer to SessionAutoReply. ACP unification deferred to a follow-up PR (see TODO(auto-reply-acp) in src/session/auto-reply/auto-reply.ts). Addresses audit finding F11 (Opus diamond review, 2026-04-22) — biggest upstream blocker. Diamond review: codex-5.3 spec + general (Opus) quality, both APPROVE after rework that added test/cli/run-make-sink.test.ts to pin the JSON shape contract. --- packages/opencode/src/cli/cmd/run.ts | 97 +++++-- .../auto-reply/auto-reply.ts} | 38 ++- .../opencode/src/session/auto-reply/sink.ts | 38 +++ .../opencode/test/cli/run-make-sink.test.ts | 123 +++++++++ .../auto-reply.test.ts} | 241 ++++++++---------- .../session/subagent-hang-regression.test.ts | 6 +- 6 files changed, 358 insertions(+), 185 deletions(-) rename packages/opencode/src/{cli/cmd/run-events.ts => session/auto-reply/auto-reply.ts} (85%) create mode 100644 packages/opencode/src/session/auto-reply/sink.ts create mode 100644 packages/opencode/test/cli/run-make-sink.test.ts rename packages/opencode/test/{cli/run-events.test.ts => session/auto-reply.test.ts} (78%) diff --git a/packages/opencode/src/cli/cmd/run.ts b/packages/opencode/src/cli/cmd/run.ts index 8c2e8214ab5c..01e3651793e6 100644 --- a/packages/opencode/src/cli/cmd/run.ts +++ b/packages/opencode/src/cli/cmd/run.ts @@ -28,7 +28,8 @@ import { TodoWriteTool } from "../../tool/todo" import { Locale } from "../../util" import { AppRuntime } from "@/effect/app-runtime" import { SessionID } from "@/session/schema" -import { RunEvents } from "./run-events" +import { SessionAutoReply } from "@/session/auto-reply/auto-reply" +import { silentSink as silentAutoReplySink, type Sink as AutoReplySink } from "@/session/auto-reply/sink" type ToolProps = { input: Tool.InferParameters @@ -212,19 +213,61 @@ function normalizePath(input?: string) { return input } +/** + * Build the auto-reply Sink for `opencode run`. Decouples emission policy + * (stdout JSON when jsonMode, no-op otherwise) from the auto-reply core in + * `src/session/auto-reply/`. ACP/TUI/daemon will provide their own sinks + * routing to their own transports — see TODO(auto-reply-acp) in auto-reply.ts. + * + * Exported for unit-test access only — operators rely on the JSON shape + * (`autoRejectSessionID`/`totalAutoRejects`/etc) emitted under jsonMode, so + * this builder pins that external CLI contract independently of the sink + * callback signature. + */ +export function makeRunSink(jsonMode: boolean, rootSessionID: SessionID): AutoReplySink { + // The non-jsonMode case has no UI side: the dispatchPermissionAsked path + // handles the per-event UI line, livelock warnings already log via the core + // log.warn, and stats counters live on the returned Handle. silentSink is + // the right object — reuse it instead of duplicating the shape. + if (!jsonMode) return silentAutoReplySink + const emit = (type: string, data: Record) => { + process.stdout.write(JSON.stringify({ type, timestamp: Date.now(), sessionID: rootSessionID, ...data }) + "\n") + } + return { + onAutoReject: (input) => + emit("auto-reject", { + kind: input.kind, + autoRejectSessionID: input.sessionID, + totalAutoRejects: input.total, + }), + onAutoApprove: (input) => + emit("auto-approve", { + kind: input.kind, + autoApproveSessionID: input.sessionID, + totalAutoApproves: input.total, + }), + // No JSON event for livelock warnings: the core already log.warn's at the + // same gate, and the operator-facing JSON contract historically (under the + // old RunEvents.emit) only emitted auto-reject/auto-approve. Keeping + // livelock log-only preserves that contract under F11 extraction. + onLivelockWarn: () => {}, + } +} + /** * Reply to a `permission.asked` SSE event in attach mode. * - * Coupling note: in non-attach mode `RunEvents.make` runs in-process alongside - * `prompt.loop` and owns the auto-reply contract for the root session and its - * descendants (it is *local* to this CLI process, not server-side). In attach - * mode, the local CLI is just an SSE viewer of a remote opencode server, and - * the remote server does not currently spin up its own RunEvents handler — - * so this function is the only auto-responder for permission asks visible to - * the local user. If a future change makes the remote server attach-aware - * (i.e., it runs its own RunEvents per attached client), this helper becomes - * a redundant double-responder and must be removed (along with the dispatch - * in `dispatchPermissionAsked` and its call site in run.ts's SSE loop). + * Coupling note: in non-attach mode `SessionAutoReply.make` runs in-process + * alongside `prompt.loop` and owns the auto-reply contract for the root + * session and its descendants (it is *local* to this CLI process, not + * server-side). In attach mode, the local CLI is just an SSE viewer of a + * remote opencode server, and the remote server does not currently spin up + * its own auto-reply handler — so this function is the only auto-responder + * for permission asks visible to the local user. If a future change makes + * the remote server attach-aware (i.e., it runs its own auto-reply per + * attached client), this helper becomes a redundant double-responder and + * must be removed (along with the dispatch in `dispatchPermissionAsked` and + * its call site in run.ts's SSE loop). * * Behavior matrix for attach mode: * - skipPermissions=true → reply "once" (silent; symmetric with auto-approve flow) @@ -235,8 +278,8 @@ function normalizePath(input?: string) { * * Followup (non-blocking): attach + jsonMode silently auto-rejects without * emitting an `auto-reject` JSON event (non-attach mode emits one via - * RunEvents). Reaching parity would require either an attach-side JSON - * emitter here or moving JSON emission into a sink that both modes share. + * SessionAutoReply's sink). Reaching parity would require either an + * attach-side JSON emitter here or routing both modes through the same sink. * Out of scope for F10 (which only collapses the dual permission paths). * * Each invocation produces exactly one `sdk.permission.reply` call. Caller @@ -273,9 +316,9 @@ export async function replyPermissionAttachMode(input: { /** * Dispatch a `permission.asked` SSE event to either the no-op-with-log path - * (non-attach: `runEventsHandle` is set, in-process RunEvents owns the reply) - * or the attach-mode reply path (`runEventsHandle` is null, this client must - * reply via SDK). + * (non-attach: `runEventsHandle` is set, in-process SessionAutoReply owns the + * reply) or the attach-mode reply path (`runEventsHandle` is null, this + * client must reply via SDK). * * Exported for unit-test access only — the call site is `run.ts`'s SSE loop. * Keeping it exported gives the F10 dual-path invariant a testable seam without @@ -283,7 +326,7 @@ export async function replyPermissionAttachMode(input: { * * Invariant: `hasRunEventsHandle === !args.attach` (enforced at the * `runEventsHandle` ternary in run.ts; see comment near construction). If that - * invariant ever drifts — e.g. attach mode also gets a server-side RunEvents + * invariant ever drifts — e.g. attach mode also gets a server-side auto-reply * — the dual-responder race F10 was raised against returns. The unit tests * for this dispatch pin the contract: at most one `sdk.permission.reply` per * `permission.asked` event. @@ -308,10 +351,10 @@ export async function dispatchPermissionAsked(input: { if (input.permission.sessionID !== input.sessionID) return false if (input.hasRunEventsHandle) { - // Non-attach mode: in-process RunEvents owns the auto-reply contract; - // here we only surface a UI line (skipped under dangerously-skip-permissions - // and under jsonMode, where the matching auto-reject JSON event is emitted - // by RunEvents instead). + // Non-attach mode: in-process SessionAutoReply owns the auto-reply + // contract; here we only surface a UI line (skipped under + // dangerously-skip-permissions and under jsonMode, where the matching + // auto-reject JSON event is emitted by SessionAutoReply's sink instead). if (!input.skipPermissions && !input.jsonMode) { input.println( `permission requested: ${input.permission.permission} (${input.permission.patterns.join(", ")}); auto-rejecting`, @@ -749,11 +792,13 @@ export const RunCommand = cmd({ const runEventsHandle = args.attach ? null : await AppRuntime.runPromise( - RunEvents.make({ - rootSessionID: sessionID, - skipPermissions: args["dangerously-skip-permissions"] === true, - jsonMode, - }), + SessionAutoReply.make( + { + rootSessionID: sessionID, + skipPermissions: args["dangerously-skip-permissions"] === true, + }, + makeRunSink(jsonMode, sessionID), + ), ) try { diff --git a/packages/opencode/src/cli/cmd/run-events.ts b/packages/opencode/src/session/auto-reply/auto-reply.ts similarity index 85% rename from packages/opencode/src/cli/cmd/run-events.ts rename to packages/opencode/src/session/auto-reply/auto-reply.ts index 2ec97ecaa769..0f94dde5d617 100644 --- a/packages/opencode/src/cli/cmd/run-events.ts +++ b/packages/opencode/src/session/auto-reply/auto-reply.ts @@ -6,31 +6,35 @@ import { Session } from "@/session" import { NotFoundError } from "@/storage" import { SessionID } from "@/session/schema" import { Log } from "@/util" +import type { Sink } from "./sink" -const log = Log.create({ service: "run-events" }) +const log = Log.create({ service: "session-auto-reply" }) export const LIVELOCK_WARN_THRESHOLD = 5 export const MAX_LINEAGE_DEPTH = 32 -export interface Config { +export type Config = { rootSessionID: SessionID skipPermissions: boolean - jsonMode: boolean } -export interface Stats { +export type Stats = { autoRejectedQuestions: number autoRejectedPermissions: number autoApprovedPermissions: number livelockWarned: boolean } -export interface Handle { +export type Handle = { readonly stats: Stats readonly unsubscribe: () => void } -export const make = Effect.fn("RunEvents.make")(function* (config: Config) { +// TODO(auto-reply-acp): ACP has equivalent auto-permission-reply logic in +// src/acp/*. Unifying it on top of this core (so ACP also goes through Sink) +// is deferred to a follow-up PR — F11 only extracts CLI's RunEvents into +// reusable shape. One subsystem rework at a time. +export const make = Effect.fn("SessionAutoReply.make")(function* (config: Config, sink: Sink) { const question = yield* Question.Service const permission = yield* Permission.Service const bus = yield* Bus.Service @@ -45,14 +49,7 @@ export const make = Effect.fn("RunEvents.make")(function* (config: Config) { const descendants = new Set([config.rootSessionID]) - const emit = (type: string, data: Record) => { - if (!config.jsonMode) return - process.stdout.write( - JSON.stringify({ type, timestamp: Date.now(), sessionID: config.rootSessionID, ...data }) + "\n", - ) - } - - const isDescendant = Effect.fn("RunEvents.isDescendant")(function* (sid: SessionID) { + const isDescendant = Effect.fn("SessionAutoReply.isDescendant")(function* (sid: SessionID) { if (descendants.has(sid)) return true let cur: SessionID | undefined = sid const chain: SessionID[] = [] @@ -79,12 +76,13 @@ export const make = Effect.fn("RunEvents.make")(function* (config: Config) { if (kind === "question") stats.autoRejectedQuestions++ else stats.autoRejectedPermissions++ const total = stats.autoRejectedQuestions + stats.autoRejectedPermissions - emit("auto-reject", { kind, autoRejectSessionID: sid, totalAutoRejects: total }) + sink.onAutoReject({ kind, sessionID: sid, total }) if (!stats.livelockWarned && total > LIVELOCK_WARN_THRESHOLD) { stats.livelockWarned = true log.warn("possible subagent livelock: >5 auto-rejects in a single run", { rootSessionID: config.rootSessionID, }) + sink.onLivelockWarn({ rootSessionID: config.rootSessionID }) } } @@ -95,11 +93,7 @@ export const make = Effect.fn("RunEvents.make")(function* (config: Config) { // detect auto-reject loops. const bumpApprove = (sid: SessionID) => { stats.autoApprovedPermissions++ - emit("auto-approve", { - kind: "permission", - autoApproveSessionID: sid, - totalAutoApproves: stats.autoApprovedPermissions, - }) + sink.onAutoApprove({ kind: "permission", sessionID: sid, total: stats.autoApprovedPermissions }) } // bus.subscribeCallback wraps the callback in an Effect.tryPromise-based @@ -110,7 +104,7 @@ export const make = Effect.fn("RunEvents.make")(function* (config: Config) { // subagent loops with many simultaneous descendants. Defects inside the forked // fiber do not surface through that subscription callback wrapper, so log them // here instead. Track in-flight fibers so unsubscribe() can interrupt them and - // bound handler work to the RunEvents lifecycle. + // bound handler work to the AutoReply lifecycle. const inflight = new Set>() let closed = false const fork = (effect: Effect.Effect) => { @@ -178,4 +172,4 @@ export const make = Effect.fn("RunEvents.make")(function* (config: Config) { return { stats, unsubscribe } satisfies Handle }) -export * as RunEvents from "./run-events" +export * as SessionAutoReply from "./auto-reply" diff --git a/packages/opencode/src/session/auto-reply/sink.ts b/packages/opencode/src/session/auto-reply/sink.ts new file mode 100644 index 000000000000..d35cec482569 --- /dev/null +++ b/packages/opencode/src/session/auto-reply/sink.ts @@ -0,0 +1,38 @@ +import type { SessionID } from "../schema" + +/** + * Sink interface for auto-reply telemetry. The auto-reply core is intentionally + * unaware of how/where these events surface (stdout JSON, structured logger, + * test capture, ACP transport, …) so it can be reused across CLI run mode, + * future TUI headless / daemon mode, and ACP without dragging emission policy + * into the bus-subscription core. + * + * Each method is fire-and-forget — synchronous, no return value. Implementations + * that need async work (network emit, etc.) should fire-and-forget internally; + * the bus dispatch path can not block on emission. + */ +export type Sink = { + readonly onAutoReject: (input: { + readonly kind: "question" | "permission" + readonly sessionID: SessionID + readonly total: number + }) => void + readonly onAutoApprove: (input: { + readonly kind: "permission" + readonly sessionID: SessionID + readonly total: number + }) => void + readonly onLivelockWarn: (input: { readonly rootSessionID: SessionID }) => void +} + +/** + * No-op sink. Intended for tests and contexts that only care about the Stats + * counters on the returned handle (e.g. F12 regression-test reuse). + */ +export const silentSink: Sink = { + onAutoReject: () => {}, + onAutoApprove: () => {}, + onLivelockWarn: () => {}, +} + +export * as AutoReplySink from "./sink" diff --git a/packages/opencode/test/cli/run-make-sink.test.ts b/packages/opencode/test/cli/run-make-sink.test.ts new file mode 100644 index 000000000000..5b69524d132a --- /dev/null +++ b/packages/opencode/test/cli/run-make-sink.test.ts @@ -0,0 +1,123 @@ +import { describe, expect, test } from "bun:test" +import { makeRunSink } from "../../src/cli/cmd/run" +import { SessionID } from "../../src/session/schema" +import { silentSink as silentAutoReplySink } from "../../src/session/auto-reply/sink" + +// F11 contract: makeRunSink translates the auto-reply Sink callbacks into the +// operator-facing JSON envelope written under `--output-format=json`. Operators +// have shell pipelines built against these field names; renaming any field is +// a breaking change to the CLI's external contract. This file pins the shape +// independently of the SessionAutoReply core's sink callback signature. +// +// Historical context: prior to F11 the JSON emission lived inline in +// run-events.ts's `emit()` helper, and the run-events.test.ts file pinned the +// shape by monkey-patching process.stdout. After F11 the sink interface +// removed jsonMode from the core, so the JSON shape now lives in run.ts and +// needs its own targeted test. + +const ROOT = SessionID.make("ses_root_make_sink_test_000000") + +describe("cli/run makeRunSink", () => { + test("non-jsonMode returns the shared silentSink (no allocation, no emission)", () => { + const sink = makeRunSink(false, ROOT) + expect(sink).toBe(silentAutoReplySink) + }) + + test("jsonMode emits auto-reject with stable field names", () => { + const writes: string[] = [] + const original = process.stdout.write.bind(process.stdout) + process.stdout.write = ((chunk: string | Uint8Array) => { + writes.push(typeof chunk === "string" ? chunk : Buffer.from(chunk).toString("utf8")) + return true + }) as typeof process.stdout.write + + try { + const sink = makeRunSink(true, ROOT) + const childSessionID = SessionID.make("ses_child_make_sink_test_00000") + sink.onAutoReject({ kind: "question", sessionID: childSessionID, total: 3 }) + } finally { + process.stdout.write = original + } + + expect(writes).toHaveLength(1) + const payload = JSON.parse(writes[0].trim()) as Record + expect(payload.type).toBe("auto-reject") + expect(typeof payload.timestamp).toBe("number") + expect(payload.sessionID).toBe(ROOT) + expect(payload.kind).toBe("question") + expect(payload.autoRejectSessionID).toBe("ses_child_make_sink_test_00000") + expect(payload.totalAutoRejects).toBe(3) + expect(Object.keys(payload).sort()).toEqual( + ["autoRejectSessionID", "kind", "sessionID", "timestamp", "totalAutoRejects", "type"].sort(), + ) + }) + + test("jsonMode emits auto-approve with stable field names", () => { + const writes: string[] = [] + const original = process.stdout.write.bind(process.stdout) + process.stdout.write = ((chunk: string | Uint8Array) => { + writes.push(typeof chunk === "string" ? chunk : Buffer.from(chunk).toString("utf8")) + return true + }) as typeof process.stdout.write + + try { + const sink = makeRunSink(true, ROOT) + sink.onAutoApprove({ kind: "permission", sessionID: ROOT, total: 1 }) + } finally { + process.stdout.write = original + } + + expect(writes).toHaveLength(1) + const payload = JSON.parse(writes[0].trim()) as Record + expect(payload.type).toBe("auto-approve") + expect(typeof payload.timestamp).toBe("number") + expect(payload.sessionID).toBe(ROOT) + expect(payload.kind).toBe("permission") + expect(payload.autoApproveSessionID).toBe(ROOT) + expect(payload.totalAutoApproves).toBe(1) + expect(Object.keys(payload).sort()).toEqual( + ["autoApproveSessionID", "kind", "sessionID", "timestamp", "totalAutoApproves", "type"].sort(), + ) + }) + + test("jsonMode does NOT emit a JSON line for onLivelockWarn (preserves pre-F11 behavior; log.warn handles it)", () => { + const writes: string[] = [] + const original = process.stdout.write.bind(process.stdout) + process.stdout.write = ((chunk: string | Uint8Array) => { + writes.push(typeof chunk === "string" ? chunk : Buffer.from(chunk).toString("utf8")) + return true + }) as typeof process.stdout.write + + try { + const sink = makeRunSink(true, ROOT) + sink.onLivelockWarn({ rootSessionID: ROOT }) + } finally { + process.stdout.write = original + } + + expect(writes).toHaveLength(0) + }) + + test("jsonMode emits one separate line per call (newline-delimited JSON)", () => { + const writes: string[] = [] + const original = process.stdout.write.bind(process.stdout) + process.stdout.write = ((chunk: string | Uint8Array) => { + writes.push(typeof chunk === "string" ? chunk : Buffer.from(chunk).toString("utf8")) + return true + }) as typeof process.stdout.write + + try { + const sink = makeRunSink(true, ROOT) + sink.onAutoReject({ kind: "question", sessionID: ROOT, total: 1 }) + sink.onAutoReject({ kind: "permission", sessionID: ROOT, total: 2 }) + sink.onAutoApprove({ kind: "permission", sessionID: ROOT, total: 1 }) + } finally { + process.stdout.write = original + } + + expect(writes).toHaveLength(3) + expect(writes.every((w) => w.endsWith("\n"))).toBe(true) + const parsed = writes.map((w) => JSON.parse(w.trim()) as { type: string }) + expect(parsed.map((p) => p.type)).toEqual(["auto-reject", "auto-reject", "auto-approve"]) + }) +}) diff --git a/packages/opencode/test/cli/run-events.test.ts b/packages/opencode/test/session/auto-reply.test.ts similarity index 78% rename from packages/opencode/test/cli/run-events.test.ts rename to packages/opencode/test/session/auto-reply.test.ts index 5a392e956cfa..28151003217e 100644 --- a/packages/opencode/test/cli/run-events.test.ts +++ b/packages/opencode/test/session/auto-reply.test.ts @@ -9,7 +9,8 @@ import { Permission } from "../../src/permission" import { Session } from "../../src/session" import { Bus } from "../../src/bus" import { SessionID } from "../../src/session/schema" -import { MAX_LINEAGE_DEPTH, RunEvents } from "../../src/cli/cmd/run-events" +import { MAX_LINEAGE_DEPTH, SessionAutoReply } from "../../src/session/auto-reply/auto-reply" +import { AutoReplySink } from "../../src/session/auto-reply/sink" const it = testEffect( Layer.mergeAll( @@ -21,17 +22,36 @@ const it = testEffect( ), ) -describe("cli/run-events", () => { - it.live("auto-rejects question.asked for the root session (non-attach, non-json)", () => +// Test sink that captures every Sink event into a single ordered array. Lets +// jsonMode-equivalent tests assert on the auto-reject/auto-approve contract +// without monkey-patching process.stdout (the previous RunEvents test pattern). +type CapturedEvent = + | { readonly type: "auto-reject"; readonly kind: "question" | "permission"; readonly sessionID: SessionID; readonly total: number } + | { readonly type: "auto-approve"; readonly kind: "permission"; readonly sessionID: SessionID; readonly total: number } + | { readonly type: "livelock-warn"; readonly rootSessionID: SessionID } + +function captureSink() { + const events: CapturedEvent[] = [] + const sink: AutoReplySink.Sink = { + onAutoReject: (input) => + events.push({ type: "auto-reject", kind: input.kind, sessionID: input.sessionID, total: input.total }), + onAutoApprove: (input) => + events.push({ type: "auto-approve", kind: input.kind, sessionID: input.sessionID, total: input.total }), + onLivelockWarn: (input) => events.push({ type: "livelock-warn", rootSessionID: input.rootSessionID }), + } + return { sink, events } +} + +describe("session/auto-reply", () => { + it.live("auto-rejects question.asked for the root session", () => provideTmpdirInstance(() => Effect.gen(function* () { const question = yield* Question.Service const rootSessionID = SessionID.make("ses_root_0000000000000000000000") - const handler = yield* RunEvents.make({ - rootSessionID, - skipPermissions: false, - jsonMode: false, - }) + const handler = yield* SessionAutoReply.make( + { rootSessionID, skipPermissions: false }, + AutoReplySink.silentSink, + ) const result = yield* Effect.exit( question.ask({ @@ -62,11 +82,10 @@ describe("cli/run-events", () => { const session = yield* Session.Service const rootSessionID = SessionID.make("ses_root_desc_0000000000000000") const child = yield* session.create({ parentID: rootSessionID, title: "Child" }) - const handler = yield* RunEvents.make({ - rootSessionID, - skipPermissions: false, - jsonMode: false, - }) + const handler = yield* SessionAutoReply.make( + { rootSessionID, skipPermissions: false }, + AutoReplySink.silentSink, + ) const result = yield* Effect.exit( question.ask({ @@ -98,11 +117,10 @@ describe("cli/run-events", () => { const rootSessionID = SessionID.make("ses_root_grandchild_0000000000000") const middle = yield* session.create({ parentID: rootSessionID, title: "Middle" }) const child = yield* session.create({ parentID: middle.id, title: "Grandchild" }) - const handler = yield* RunEvents.make({ - rootSessionID, - skipPermissions: false, - jsonMode: false, - }) + const handler = yield* SessionAutoReply.make( + { rootSessionID, skipPermissions: false }, + AutoReplySink.silentSink, + ) const childResult = yield* Effect.exit( question.ask({ @@ -152,11 +170,10 @@ describe("cli/run-events", () => { const question = yield* Question.Service const rootSessionID = SessionID.make("ses_root_unrelated_000000000000") const unrelatedSessionID = SessionID.make("ses_unrelated_000000000000000") - const handler = yield* RunEvents.make({ - rootSessionID, - skipPermissions: false, - jsonMode: false, - }) + const handler = yield* SessionAutoReply.make( + { rootSessionID, skipPermissions: false }, + AutoReplySink.silentSink, + ) const fiber = yield* question .ask({ @@ -192,11 +209,10 @@ describe("cli/run-events", () => { const question = yield* Question.Service const session = yield* Session.Service const rootSessionID = SessionID.make("ses_root_depth_cutoff_000000000000") - const handler = yield* RunEvents.make({ - rootSessionID, - skipPermissions: false, - jsonMode: false, - }) + const handler = yield* SessionAutoReply.make( + { rootSessionID, skipPermissions: false }, + AutoReplySink.silentSink, + ) const createDeepChild = (parentID: SessionID, remaining: number): Effect.Effect => { if (remaining === 0) return Effect.succeed(parentID) @@ -233,21 +249,21 @@ describe("cli/run-events", () => { ), ) - it.live("RunEvents.Config does not expose an attach field", () => + it.live("Config does not expose attach or jsonMode fields", () => Effect.sync(() => { - const validConfig: RunEvents.Config = { + const validConfig: SessionAutoReply.Config = { rootSessionID: SessionID.make("ses_root_cfg_0000000000000000000"), skipPermissions: false, - jsonMode: false, } void validConfig - const invalidConfig: RunEvents.Config = { + const invalidConfig: SessionAutoReply.Config = { rootSessionID: SessionID.make("ses_root_cfg_0000000000000000001"), skipPermissions: false, - jsonMode: false, - // @ts-expect-error attach mode is represented by not creating RunEvents + // @ts-expect-error attach mode is represented by not creating SessionAutoReply, + // and jsonMode is now a caller (run.ts) sink concern, not core config attach: true, + jsonMode: true, } void invalidConfig }), @@ -260,11 +276,10 @@ describe("cli/run-events", () => { const session = yield* Session.Service const rootSessionID = SessionID.make("ses_root_perm_desc_000000000000") const child = yield* session.create({ parentID: rootSessionID, title: "Child" }) - const handler = yield* RunEvents.make({ - rootSessionID, - skipPermissions: false, - jsonMode: false, - }) + const handler = yield* SessionAutoReply.make( + { rootSessionID, skipPermissions: false }, + AutoReplySink.silentSink, + ) const fiber = yield* permission .ask({ @@ -294,11 +309,10 @@ describe("cli/run-events", () => { const session = yield* Session.Service const rootSessionID = SessionID.make("ses_root_skip_perm_000000000000") const child = yield* session.create({ parentID: rootSessionID, title: "Child" }) - const handler = yield* RunEvents.make({ - rootSessionID, - skipPermissions: true, - jsonMode: false, - }) + const handler = yield* SessionAutoReply.make( + { rootSessionID, skipPermissions: true }, + AutoReplySink.silentSink, + ) const exit = yield* Effect.exit( permission.ask({ @@ -329,11 +343,10 @@ describe("cli/run-events", () => { const unrelatedRootSessionID = SessionID.make("ses_unrelated_root_000000000000") const x = yield* session.create({ parentID: unrelatedRootSessionID, title: "X" }) const y = yield* session.create({ parentID: x.id, title: "Y" }) - const handler = yield* RunEvents.make({ - rootSessionID, - skipPermissions: false, - jsonMode: false, - }) + const handler = yield* SessionAutoReply.make( + { rootSessionID, skipPermissions: false }, + AutoReplySink.silentSink, + ) const askPermission = (sessionID: SessionID) => permission.ask({ @@ -384,11 +397,10 @@ describe("cli/run-events", () => { const unsubscribeReply = yield* bus.subscribeCallback(Permission.Event.Replied, (evt) => { replies.push({ sessionID: evt.properties.sessionID, reply: evt.properties.reply }) }) - const handler = yield* RunEvents.make({ - rootSessionID, - skipPermissions: true, - jsonMode: false, - }) + const handler = yield* SessionAutoReply.make( + { rootSessionID, skipPermissions: true }, + AutoReplySink.silentSink, + ) const exit = yield* Effect.exit( permission.ask({ @@ -420,24 +432,19 @@ describe("cli/run-events", () => { ), ) - it.live("emits structured JSON event to stdout when jsonMode=true", () => + // F11: jsonMode emission moved out of the core into run.ts's sink. The core + // contract is now "Sink.onAutoReject is invoked with the right shape" — JSON + // serialization is verified separately by the run.ts sink builder. This + // replaces the previous stdout-monkey-patching jsonMode test. + it.live("calls Sink.onAutoReject with kind='question' on auto-reject", () => provideTmpdirInstance(() => Effect.gen(function* () { const question = yield* Question.Service - const rootSessionID = SessionID.make("ses_root_json_00000000000000000000") - const writes: string[] = [] - const originalWrite = process.stdout.write.bind(process.stdout) - process.stdout.write = ((chunk: string | Uint8Array) => { - writes.push(typeof chunk === "string" ? chunk : Buffer.from(chunk).toString("utf8")) - return true - }) as typeof process.stdout.write + const rootSessionID = SessionID.make("ses_root_sink_reject_000000000000") + const captured = captureSink() yield* Effect.acquireUseRelease( - RunEvents.make({ - rootSessionID, - skipPermissions: false, - jsonMode: true, - }), + SessionAutoReply.make({ rootSessionID, skipPermissions: false }, captured.sink), (handle) => Effect.gen(function* () { const result = yield* Effect.exit( @@ -456,43 +463,29 @@ describe("cli/run-events", () => { expect(handle.stats.autoRejectedQuestions).toBe(1) }), (handle) => Effect.sync(() => handle.unsubscribe()), - ).pipe( - Effect.ensuring( - Effect.sync(() => { - process.stdout.write = originalWrite - }), - ), ) - const payload = JSON.parse((writes[0] ?? "").trim()) as { - type: string - timestamp: number - sessionID: string - kind: string - autoRejectSessionID: string - totalAutoRejects: number - } - - expect(payload.type).toBe("auto-reject") - expect(typeof payload.timestamp).toBe("number") - expect(payload.sessionID).toBe(rootSessionID) - expect(payload.kind).toBe("question") - expect(payload.autoRejectSessionID).toBe(rootSessionID) - expect(payload.totalAutoRejects).toBe(1) + expect(captured.events).toHaveLength(1) + expect(captured.events[0]).toEqual({ + type: "auto-reject", + kind: "question", + sessionID: rootSessionID, + total: 1, + }) }), ), ) - it.live("sets livelockWarned=true on the 6th cumulative auto-reject", () => + it.live("sets livelockWarned=true and calls onLivelockWarn on the 6th cumulative auto-reject", () => provideTmpdirInstance(() => Effect.gen(function* () { const question = yield* Question.Service const rootSessionID = SessionID.make("ses_root_livelock_000000000000000") - const handler = yield* RunEvents.make({ - rootSessionID, - skipPermissions: false, - jsonMode: false, - }) + const captured = captureSink() + const handler = yield* SessionAutoReply.make( + { rootSessionID, skipPermissions: false }, + captured.sink, + ) const askOnce = () => Effect.exit( @@ -513,12 +506,17 @@ describe("cli/run-events", () => { }) expect(firstFiveExits.every(Exit.isFailure)).toBe(true) expect(handler.stats.livelockWarned).toBe(false) + expect(captured.events.filter((e) => e.type === "livelock-warn")).toHaveLength(0) const sixthExit = yield* askOnce() expect(Exit.isFailure(sixthExit)).toBe(true) expect(handler.stats.autoRejectedQuestions).toBe(6) expect(handler.stats.livelockWarned).toBe(true) + const livelockEvents = captured.events.filter((e) => e.type === "livelock-warn") + expect(livelockEvents).toHaveLength(1) + expect(livelockEvents[0]).toEqual({ type: "livelock-warn", rootSessionID }) + yield* Effect.sync(() => handler.unsubscribe()) }), ), @@ -533,11 +531,10 @@ describe("cli/run-events", () => { Effect.gen(function* () { const question = yield* Question.Service const rootSessionID = SessionID.make("ses_root_post_unsub_000000000000") - const handler = yield* RunEvents.make({ - rootSessionID, - skipPermissions: false, - jsonMode: false, - }) + const handler = yield* SessionAutoReply.make( + { rootSessionID, skipPermissions: false }, + AutoReplySink.silentSink, + ) yield* Effect.sync(() => handler.unsubscribe()) // Ask after unsubscribe — without the bus subscription, no auto-reject @@ -571,9 +568,9 @@ describe("cli/run-events", () => { ) // F8: when skipPermissions=true the auto-approve branch must produce symmetric - // telemetry — Stats counter + JSON event — so operators running + // telemetry — Stats counter + Sink event — so operators running // --dangerously-skip-permissions get an audit trail of what was approved. - it.live("increments autoApprovedPermissions and emits JSON event when skipPermissions=true", () => + it.live("increments autoApprovedPermissions and calls Sink.onAutoApprove when skipPermissions=true", () => provideTmpdirInstance(() => Effect.gen(function* () { const permission = yield* Permission.Service @@ -583,20 +580,10 @@ describe("cli/run-events", () => { const unsubscribeReply = yield* bus.subscribeCallback(Permission.Event.Replied, (evt) => { replies.push({ sessionID: evt.properties.sessionID, reply: evt.properties.reply }) }) - - const writes: string[] = [] - const originalWrite = process.stdout.write.bind(process.stdout) - process.stdout.write = ((chunk: string | Uint8Array) => { - writes.push(typeof chunk === "string" ? chunk : Buffer.from(chunk).toString("utf8")) - return true - }) as typeof process.stdout.write + const captured = captureSink() yield* Effect.acquireUseRelease( - RunEvents.make({ - rootSessionID, - skipPermissions: true, - jsonMode: true, - }), + SessionAutoReply.make({ rootSessionID, skipPermissions: true }, captured.sink), (handle) => Effect.gen(function* () { const exit = yield* Effect.exit( @@ -625,29 +612,15 @@ describe("cli/run-events", () => { unsubscribeReply() handle.unsubscribe() }), - ).pipe( - Effect.ensuring( - Effect.sync(() => { - process.stdout.write = originalWrite - }), - ), ) - const payload = JSON.parse((writes[0] ?? "").trim()) as { - type: string - timestamp: number - sessionID: string - kind: string - autoApproveSessionID: string - totalAutoApproves: number - } - - expect(payload.type).toBe("auto-approve") - expect(typeof payload.timestamp).toBe("number") - expect(payload.sessionID).toBe(rootSessionID) - expect(payload.kind).toBe("permission") - expect(payload.autoApproveSessionID).toBe(rootSessionID) - expect(payload.totalAutoApproves).toBe(1) + expect(captured.events).toHaveLength(1) + expect(captured.events[0]).toEqual({ + type: "auto-approve", + kind: "permission", + sessionID: rootSessionID, + total: 1, + }) }), ), ) diff --git a/packages/opencode/test/session/subagent-hang-regression.test.ts b/packages/opencode/test/session/subagent-hang-regression.test.ts index b97151283f9a..22569f581347 100644 --- a/packages/opencode/test/session/subagent-hang-regression.test.ts +++ b/packages/opencode/test/session/subagent-hang-regression.test.ts @@ -5,7 +5,7 @@ // and surfaces as a `retry` SessionStatus. Gates against indefinite hangs. // 2. Subagent question in headless: Phase B's Question→Bus publish + // Question.reject→Deferred.fail contract must allow an external -// subscriber (mirroring RunEvents) to unblock a subagent question tool. +// subscriber (mirroring SessionAutoReply) to unblock a subagent question tool. // Gates against headless deadlock when the user can't answer. import { expect } from "bun:test" @@ -173,7 +173,7 @@ it.live( subagent_type: "general", }) // Reply 2 (subagent): call the question tool. Our bus subscriber - // mirrors the RunEvents contract and rejects this question, which + // mirrors the SessionAutoReply contract and rejects this question, which // unblocks the subagent's question tool with RejectedError. yield* input.llm.tool("question", { questions: [ @@ -201,7 +201,7 @@ it.live( }) yield* user(chat.id, "please ask something") - // Mirror of RunEvents.make semantics (see src/cli/cmd/run-events.ts): + // Mirror of SessionAutoReply.make semantics (see src/session/auto-reply/auto-reply.ts): // reject any question or permission raised on a descendant of the // root session. This test is a single root with one subagent, so we // reject indiscriminately — the production handler does parent-chain From 905d68d762d3a4bdd7433bfa091485e61b643aa5 Mon Sep 17 00:00:00 2001 From: Helge Tesdal Date: Wed, 29 Apr 2026 16:26:19 +0200 Subject: [PATCH 2/3] address copilot R1 suppressed feedback (low-confidence) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 3 suppressed comments evaluated: 1. auto-reply.ts:86 sink.onAutoReject before question.reject side effect could break auto-reply if sink throws. Acknowledged as a real concern but pre-F11 RunEvents.emit() had identical exposure (process.stdout.write is the only realistic throw site, e.g. EPIPE) — not an F11 regression. Hardened the Sink contract documentation in sink.ts to explicitly require 'callbacks must not throw': implementations targeting fallible transports (closed pipe, network, full disk) must catch internally. Did NOT add try/catch in the core (AGENTS.md: avoid try/catch where possible; core stays small). 2. auto-reply.ts:97 onAutoApprove same concern. Same resolution: covered by the now-explicit Sink contract. 3. auto-reply.test.ts:267 @ts-expect-error covered only the line below (attach: true), so jsonMode: true was unsuppressed but coincidentally not flagged because TS only reports the first excess-property error per literal. Real risk: if attach is later legitimately added, the suppression goes silent on jsonMode. Fixed by splitting into two separate config literals each with its own ts-expect-error. Verification: typecheck clean, 19/19 tests pass (auto-reply 14 + run-make-sink 5). --- .../opencode/src/session/auto-reply/sink.ts | 8 ++++++++ .../opencode/test/session/auto-reply.test.ts | 18 ++++++++++++++---- 2 files changed, 22 insertions(+), 4 deletions(-) diff --git a/packages/opencode/src/session/auto-reply/sink.ts b/packages/opencode/src/session/auto-reply/sink.ts index d35cec482569..8d53238d004e 100644 --- a/packages/opencode/src/session/auto-reply/sink.ts +++ b/packages/opencode/src/session/auto-reply/sink.ts @@ -10,6 +10,14 @@ import type { SessionID } from "../schema" * Each method is fire-and-forget — synchronous, no return value. Implementations * that need async work (network emit, etc.) should fire-and-forget internally; * the bus dispatch path can not block on emission. + * + * **Contract: callbacks must not throw.** Sink invocations happen inside the + * auto-reply fiber *before* the `question.reject` / `permission.reply` side + * effects. A throwing sink would fail the fiber and skip the side effect, so + * the auto-reply contract (subagent always gets a response) would silently + * break. If a sink target can fail (closed pipe, network error, full disk), + * the implementation must catch the failure internally and either drop the + * event or buffer it. The core does not wrap callbacks in try/catch. */ export type Sink = { readonly onAutoReject: (input: { diff --git a/packages/opencode/test/session/auto-reply.test.ts b/packages/opencode/test/session/auto-reply.test.ts index 28151003217e..4d18c3c0837a 100644 --- a/packages/opencode/test/session/auto-reply.test.ts +++ b/packages/opencode/test/session/auto-reply.test.ts @@ -257,15 +257,25 @@ describe("session/auto-reply", () => { } void validConfig - const invalidConfig: SessionAutoReply.Config = { + // Two excess-property fields are intentionally invalid here. We pin + // each one with its own ts-expect-error so the test continues to fail + // if either field is accidentally added back to Config (or if TS's + // excess-property error reporting changes its first-error-only behavior). + const invalidWithAttach: SessionAutoReply.Config = { rootSessionID: SessionID.make("ses_root_cfg_0000000000000000001"), skipPermissions: false, - // @ts-expect-error attach mode is represented by not creating SessionAutoReply, - // and jsonMode is now a caller (run.ts) sink concern, not core config + // @ts-expect-error attach mode is represented by not creating SessionAutoReply attach: true, + } + void invalidWithAttach + + const invalidWithJsonMode: SessionAutoReply.Config = { + rootSessionID: SessionID.make("ses_root_cfg_0000000000000000002"), + skipPermissions: false, + // @ts-expect-error jsonMode is now a caller (run.ts) sink concern, not core config jsonMode: true, } - void invalidConfig + void invalidWithJsonMode }), ) From 9374fb0b67373435ecd8e3e9b18035c9977303a6 Mon Sep 17 00:00:00 2001 From: Helge Tesdal Date: Wed, 29 Apr 2026 16:37:53 +0200 Subject: [PATCH 3/3] address copilot R2: makeRunSink honors its own Sink no-throw contract MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit R1 documented 'Sink callbacks must not throw' (sink.ts JSDoc) but the only caller-supplied sink in the codebase (makeRunSink) violated it: process.stdout.write throws EPIPE when a downstream consumer closes the pipe (e.g. `opencode run --output-format=json | head -1`). That would fail the auto-reply fiber before its question.reject / permission.reply side effect runs, breaking the contract that earned the documentation in the first place. Wraps the JSON.stringify+stdout.write in a try/catch with an intentionally empty catch block. AGENTS.md's 'avoid try/catch where possible' is overridden here because this is exactly where contract enforcement belongs — concentrated in the sink implementation rather than spread through the core. The core stays try/catch-free. New test pins the contract: a stdout.write that throws EPIPE must not propagate out of any of the three sink callbacks. --- packages/opencode/src/cli/cmd/run.ts | 12 +++++++++- .../opencode/test/cli/run-make-sink.test.ts | 23 +++++++++++++++++++ 2 files changed, 34 insertions(+), 1 deletion(-) diff --git a/packages/opencode/src/cli/cmd/run.ts b/packages/opencode/src/cli/cmd/run.ts index 01e3651793e6..842891a3df0c 100644 --- a/packages/opencode/src/cli/cmd/run.ts +++ b/packages/opencode/src/cli/cmd/run.ts @@ -231,7 +231,17 @@ export function makeRunSink(jsonMode: boolean, rootSessionID: SessionID): AutoRe // the right object — reuse it instead of duplicating the shape. if (!jsonMode) return silentAutoReplySink const emit = (type: string, data: Record) => { - process.stdout.write(JSON.stringify({ type, timestamp: Date.now(), sessionID: rootSessionID, ...data }) + "\n") + // Sink contract requires callbacks not to throw (see Sink JSDoc in + // src/session/auto-reply/sink.ts). process.stdout.write can throw on + // EPIPE (downstream consumer closed the pipe), and JSON.stringify is + // safe today but defended against future shape changes. Swallow the + // failure so the auto-reply fiber's question.reject / permission.reply + // side effect still runs. + try { + process.stdout.write(JSON.stringify({ type, timestamp: Date.now(), sessionID: rootSessionID, ...data }) + "\n") + } catch { + // intentionally empty — see contract note above + } } return { onAutoReject: (input) => diff --git a/packages/opencode/test/cli/run-make-sink.test.ts b/packages/opencode/test/cli/run-make-sink.test.ts index 5b69524d132a..6874983ea35b 100644 --- a/packages/opencode/test/cli/run-make-sink.test.ts +++ b/packages/opencode/test/cli/run-make-sink.test.ts @@ -120,4 +120,27 @@ describe("cli/run makeRunSink", () => { const parsed = writes.map((w) => JSON.parse(w.trim()) as { type: string }) expect(parsed.map((p) => p.type)).toEqual(["auto-reject", "auto-reject", "auto-approve"]) }) + + // Sink contract requires callbacks not to throw, otherwise the auto-reply + // fiber would fail before its question.reject / permission.reply side + // effect runs. process.stdout.write throws EPIPE when a downstream + // consumer closes the pipe (e.g. `opencode run --output-format=json | + // head -1`). makeRunSink must swallow that to honor the contract. + test("jsonMode swallows EPIPE-style stdout.write throws (Sink contract)", () => { + const original = process.stdout.write.bind(process.stdout) + process.stdout.write = (() => { + throw Object.assign(new Error("write EPIPE"), { code: "EPIPE" }) + }) as typeof process.stdout.write + + try { + const sink = makeRunSink(true, ROOT) + // None of these calls should throw. If any propagates, the test fails + // via the thrown error and the SessionAutoReply contract is violated. + sink.onAutoReject({ kind: "question", sessionID: ROOT, total: 1 }) + sink.onAutoApprove({ kind: "permission", sessionID: ROOT, total: 1 }) + sink.onLivelockWarn({ rootSessionID: ROOT }) + } finally { + process.stdout.write = original + } + }) })