Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 81 additions & 26 deletions packages/opencode/src/cli/cmd/run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ import { TodoWriteTool } from "../../tool/todo"
import { Locale } from "../../util"
import { AppRuntime } from "@/effect/app-runtime"
import { SessionID } from "@/session/schema"
import { RunEvents } from "./run-events"
import { SessionAutoReply } from "@/session/auto-reply/auto-reply"
import { silentSink as silentAutoReplySink, type Sink as AutoReplySink } from "@/session/auto-reply/sink"

type ToolProps<T> = {
input: Tool.InferParameters<T>
Expand Down Expand Up @@ -212,19 +213,71 @@ function normalizePath(input?: string) {
return input
}

/**
* Build the auto-reply Sink for `opencode run`. Decouples emission policy
* (stdout JSON when jsonMode, no-op otherwise) from the auto-reply core in
* `src/session/auto-reply/`. ACP/TUI/daemon will provide their own sinks
* routing to their own transports — see TODO(auto-reply-acp) in auto-reply.ts.
*
* Exported for unit-test access only — operators rely on the JSON shape
* (`autoRejectSessionID`/`totalAutoRejects`/etc) emitted under jsonMode, so
* this builder pins that external CLI contract independently of the sink
* callback signature.
*/
export function makeRunSink(jsonMode: boolean, rootSessionID: SessionID): AutoReplySink {
// The non-jsonMode case has no UI side: the dispatchPermissionAsked path
// handles the per-event UI line, livelock warnings already log via the core
// log.warn, and stats counters live on the returned Handle. silentSink is
// the right object — reuse it instead of duplicating the shape.
if (!jsonMode) return silentAutoReplySink
const emit = (type: string, data: Record<string, unknown>) => {
// Sink contract requires callbacks not to throw (see Sink JSDoc in
// src/session/auto-reply/sink.ts). process.stdout.write can throw on
// EPIPE (downstream consumer closed the pipe), and JSON.stringify is
// safe today but defended against future shape changes. Swallow the
// failure so the auto-reply fiber's question.reject / permission.reply
// side effect still runs.
try {
process.stdout.write(JSON.stringify({ type, timestamp: Date.now(), sessionID: rootSessionID, ...data }) + "\n")
} catch {
// intentionally empty — see contract note above
}
}
return {
onAutoReject: (input) =>
emit("auto-reject", {
kind: input.kind,
autoRejectSessionID: input.sessionID,
totalAutoRejects: input.total,
}),
onAutoApprove: (input) =>
emit("auto-approve", {
kind: input.kind,
autoApproveSessionID: input.sessionID,
totalAutoApproves: input.total,
}),
// No JSON event for livelock warnings: the core already log.warn's at the
// same gate, and the operator-facing JSON contract historically (under the
// old RunEvents.emit) only emitted auto-reject/auto-approve. Keeping
// livelock log-only preserves that contract under F11 extraction.
onLivelockWarn: () => {},
}
}

/**
* Reply to a `permission.asked` SSE event in attach mode.
*
* Coupling note: in non-attach mode `RunEvents.make` runs in-process alongside
* `prompt.loop` and owns the auto-reply contract for the root session and its
* descendants (it is *local* to this CLI process, not server-side). In attach
* mode, the local CLI is just an SSE viewer of a remote opencode server, and
* the remote server does not currently spin up its own RunEvents handler —
* so this function is the only auto-responder for permission asks visible to
* the local user. If a future change makes the remote server attach-aware
* (i.e., it runs its own RunEvents per attached client), this helper becomes
* a redundant double-responder and must be removed (along with the dispatch
* in `dispatchPermissionAsked` and its call site in run.ts's SSE loop).
* Coupling note: in non-attach mode `SessionAutoReply.make` runs in-process
* alongside `prompt.loop` and owns the auto-reply contract for the root
* session and its descendants (it is *local* to this CLI process, not
* server-side). In attach mode, the local CLI is just an SSE viewer of a
* remote opencode server, and the remote server does not currently spin up
* its own auto-reply handler — so this function is the only auto-responder
* for permission asks visible to the local user. If a future change makes
* the remote server attach-aware (i.e., it runs its own auto-reply per
* attached client), this helper becomes a redundant double-responder and
* must be removed (along with the dispatch in `dispatchPermissionAsked` and
* its call site in run.ts's SSE loop).
*
* Behavior matrix for attach mode:
* - skipPermissions=true → reply "once" (silent; symmetric with auto-approve flow)
Expand All @@ -235,8 +288,8 @@ function normalizePath(input?: string) {
*
* Followup (non-blocking): attach + jsonMode silently auto-rejects without
* emitting an `auto-reject` JSON event (non-attach mode emits one via
* RunEvents). Reaching parity would require either an attach-side JSON
* emitter here or moving JSON emission into a sink that both modes share.
* SessionAutoReply's sink). Reaching parity would require either an
* attach-side JSON emitter here or routing both modes through the same sink.
* Out of scope for F10 (which only collapses the dual permission paths).
*
* Each invocation produces exactly one `sdk.permission.reply` call. Caller
Expand Down Expand Up @@ -273,17 +326,17 @@ export async function replyPermissionAttachMode(input: {

/**
* Dispatch a `permission.asked` SSE event to either the no-op-with-log path
* (non-attach: `runEventsHandle` is set, in-process RunEvents owns the reply)
* or the attach-mode reply path (`runEventsHandle` is null, this client must
* reply via SDK).
* (non-attach: `runEventsHandle` is set, in-process SessionAutoReply owns the
* reply) or the attach-mode reply path (`runEventsHandle` is null, this
* client must reply via SDK).
*
* Exported for unit-test access only — the call site is `run.ts`'s SSE loop.
* Keeping it exported gives the F10 dual-path invariant a testable seam without
* having to drive the whole CLI.
*
* Invariant: `hasRunEventsHandle === !args.attach` (enforced at the
* `runEventsHandle` ternary in run.ts; see comment near construction). If that
* invariant ever drifts — e.g. attach mode also gets a server-side RunEvents
* invariant ever drifts — e.g. attach mode also gets a server-side auto-reply
* — the dual-responder race F10 was raised against returns. The unit tests
* for this dispatch pin the contract: at most one `sdk.permission.reply` per
* `permission.asked` event.
Expand All @@ -308,10 +361,10 @@ export async function dispatchPermissionAsked(input: {
if (input.permission.sessionID !== input.sessionID) return false

if (input.hasRunEventsHandle) {
// Non-attach mode: in-process RunEvents owns the auto-reply contract;
// here we only surface a UI line (skipped under dangerously-skip-permissions
// and under jsonMode, where the matching auto-reject JSON event is emitted
// by RunEvents instead).
// Non-attach mode: in-process SessionAutoReply owns the auto-reply
// contract; here we only surface a UI line (skipped under
// dangerously-skip-permissions and under jsonMode, where the matching
// auto-reject JSON event is emitted by SessionAutoReply's sink instead).
if (!input.skipPermissions && !input.jsonMode) {
input.println(
`permission requested: ${input.permission.permission} (${input.permission.patterns.join(", ")}); auto-rejecting`,
Expand Down Expand Up @@ -749,11 +802,13 @@ export const RunCommand = cmd({
const runEventsHandle = args.attach
? null
: await AppRuntime.runPromise(
RunEvents.make({
rootSessionID: sessionID,
skipPermissions: args["dangerously-skip-permissions"] === true,
jsonMode,
}),
SessionAutoReply.make(
{
rootSessionID: sessionID,
skipPermissions: args["dangerously-skip-permissions"] === true,
},
makeRunSink(jsonMode, sessionID),
),
)

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,31 +6,35 @@ import { Session } from "@/session"
import { NotFoundError } from "@/storage"
import { SessionID } from "@/session/schema"
import { Log } from "@/util"
import type { Sink } from "./sink"

const log = Log.create({ service: "run-events" })
const log = Log.create({ service: "session-auto-reply" })

export const LIVELOCK_WARN_THRESHOLD = 5
export const MAX_LINEAGE_DEPTH = 32

export interface Config {
export type Config = {
rootSessionID: SessionID
skipPermissions: boolean
jsonMode: boolean
}

export interface Stats {
export type Stats = {
autoRejectedQuestions: number
autoRejectedPermissions: number
autoApprovedPermissions: number
livelockWarned: boolean
}

export interface Handle {
export type Handle = {
readonly stats: Stats
readonly unsubscribe: () => void
}

export const make = Effect.fn("RunEvents.make")(function* (config: Config) {
// TODO(auto-reply-acp): ACP has equivalent auto-permission-reply logic in
// src/acp/*. Unifying it on top of this core (so ACP also goes through Sink)
// is deferred to a follow-up PR — F11 only extracts CLI's RunEvents into
// reusable shape. One subsystem rework at a time.
export const make = Effect.fn("SessionAutoReply.make")(function* (config: Config, sink: Sink) {
const question = yield* Question.Service
const permission = yield* Permission.Service
const bus = yield* Bus.Service
Expand All @@ -45,14 +49,7 @@ export const make = Effect.fn("RunEvents.make")(function* (config: Config) {

const descendants = new Set<SessionID>([config.rootSessionID])

const emit = (type: string, data: Record<string, unknown>) => {
if (!config.jsonMode) return
process.stdout.write(
JSON.stringify({ type, timestamp: Date.now(), sessionID: config.rootSessionID, ...data }) + "\n",
)
}

const isDescendant = Effect.fn("RunEvents.isDescendant")(function* (sid: SessionID) {
const isDescendant = Effect.fn("SessionAutoReply.isDescendant")(function* (sid: SessionID) {
if (descendants.has(sid)) return true
let cur: SessionID | undefined = sid
const chain: SessionID[] = []
Expand All @@ -79,12 +76,13 @@ export const make = Effect.fn("RunEvents.make")(function* (config: Config) {
if (kind === "question") stats.autoRejectedQuestions++
else stats.autoRejectedPermissions++
const total = stats.autoRejectedQuestions + stats.autoRejectedPermissions
emit("auto-reject", { kind, autoRejectSessionID: sid, totalAutoRejects: total })
sink.onAutoReject({ kind, sessionID: sid, total })
if (!stats.livelockWarned && total > LIVELOCK_WARN_THRESHOLD) {
stats.livelockWarned = true
log.warn("possible subagent livelock: >5 auto-rejects in a single run", {
rootSessionID: config.rootSessionID,
})
sink.onLivelockWarn({ rootSessionID: config.rootSessionID })
}
}

Expand All @@ -95,11 +93,7 @@ export const make = Effect.fn("RunEvents.make")(function* (config: Config) {
// detect auto-reject loops.
const bumpApprove = (sid: SessionID) => {
stats.autoApprovedPermissions++
emit("auto-approve", {
kind: "permission",
autoApproveSessionID: sid,
totalAutoApproves: stats.autoApprovedPermissions,
})
sink.onAutoApprove({ kind: "permission", sessionID: sid, total: stats.autoApprovedPermissions })
}

// bus.subscribeCallback wraps the callback in an Effect.tryPromise-based
Expand All @@ -110,7 +104,7 @@ export const make = Effect.fn("RunEvents.make")(function* (config: Config) {
// subagent loops with many simultaneous descendants. Defects inside the forked
// fiber do not surface through that subscription callback wrapper, so log them
// here instead. Track in-flight fibers so unsubscribe() can interrupt them and
// bound handler work to the RunEvents lifecycle.
// bound handler work to the AutoReply lifecycle.
const inflight = new Set<Fiber.Fiber<void>>()
let closed = false
const fork = (effect: Effect.Effect<void>) => {
Expand Down Expand Up @@ -178,4 +172,4 @@ export const make = Effect.fn("RunEvents.make")(function* (config: Config) {
return { stats, unsubscribe } satisfies Handle
})

export * as RunEvents from "./run-events"
export * as SessionAutoReply from "./auto-reply"
46 changes: 46 additions & 0 deletions packages/opencode/src/session/auto-reply/sink.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import type { SessionID } from "../schema"

/**
* Sink interface for auto-reply telemetry. The auto-reply core is intentionally
* unaware of how/where these events surface (stdout JSON, structured logger,
* test capture, ACP transport, …) so it can be reused across CLI run mode,
* future TUI headless / daemon mode, and ACP without dragging emission policy
* into the bus-subscription core.
*
* Each method is fire-and-forget — synchronous, no return value. Implementations
* that need async work (network emit, etc.) should fire-and-forget internally;
* the bus dispatch path can not block on emission.
*
* **Contract: callbacks must not throw.** Sink invocations happen inside the
* auto-reply fiber *before* the `question.reject` / `permission.reply` side
* effects. A throwing sink would fail the fiber and skip the side effect, so
* the auto-reply contract (subagent always gets a response) would silently
* break. If a sink target can fail (closed pipe, network error, full disk),
* the implementation must catch the failure internally and either drop the
* event or buffer it. The core does not wrap callbacks in try/catch.
*/
export type Sink = {
readonly onAutoReject: (input: {
readonly kind: "question" | "permission"
readonly sessionID: SessionID
readonly total: number
}) => void
readonly onAutoApprove: (input: {
readonly kind: "permission"
readonly sessionID: SessionID
readonly total: number
}) => void
readonly onLivelockWarn: (input: { readonly rootSessionID: SessionID }) => void
}

/**
* No-op sink. Intended for tests and contexts that only care about the Stats
* counters on the returned handle (e.g. F12 regression-test reuse).
*/
export const silentSink: Sink = {
onAutoReject: () => {},
onAutoApprove: () => {},
onLivelockWarn: () => {},
}

export * as AutoReplySink from "./sink"
Loading
Loading