Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 11 additions & 23 deletions packages/opencode/src/session/auto-reply/auto-reply.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import { Cause, Effect, Fiber, Option } from "effect"
import { Cause, Effect, Fiber } from "effect"
import { Bus } from "@/bus"
import { Permission } from "@/permission"
import { Question } from "@/question"
import { Session } from "@/session"
import { NotFoundError } from "@/storage"
import { SessionID } from "@/session/schema"
import { Log } from "@/util"
import type { Sink } from "./sink"
Expand Down Expand Up @@ -47,29 +46,18 @@ export const make = Effect.fn("SessionAutoReply.make")(function* (config: Config
livelockWarned: false,
}

const descendants = new Set<SessionID>([config.rootSessionID])
// Reused across calls so the parent walk is traversed at most once per
// descendant id. Session.isDescendantOf auto-seeds `rootSessionID` on every
// call, so an empty Set is the simplest seed. Wrapped in Effect.fn so
// AutoReply-specific lineage checks remain identifiable in traces alongside
// the inner Session.isDescendantOf span.
const descendants = new Set<SessionID>()

const isDescendant = Effect.fn("SessionAutoReply.isDescendant")(function* (sid: SessionID) {
if (descendants.has(sid)) return true
let cur: SessionID | undefined = sid
const chain: SessionID[] = []
let depth = 0
while (cur !== undefined && !descendants.has(cur) && depth < MAX_LINEAGE_DEPTH) {
chain.push(cur)
depth++
const lookup: Option.Option<Session.Info> = yield* session.get(cur).pipe(
Effect.option,
Effect.catchDefect((defect) => {
if (!NotFoundError.isInstance(defect)) return Effect.die(defect)
return Effect.succeed(Option.none<Session.Info>())
}),
)
if (Option.isNone(lookup)) break
cur = lookup.value.parentID ?? undefined
}
if (cur === undefined || !descendants.has(cur)) return false
chain.forEach((item) => descendants.add(item))
return true
return yield* session.isDescendantOf(sid, config.rootSessionID, {
maxDepth: MAX_LINEAGE_DEPTH,
cache: descendants,
})
})

const bump = (kind: "question" | "permission", sid: SessionID) => {
Expand Down
73 changes: 73 additions & 0 deletions packages/opencode/src/session/session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,30 @@ export interface Interface {
sessionID: SessionID,
predicate: (msg: MessageV2.WithParts) => boolean,
) => Effect.Effect<Option.Option<MessageV2.WithParts>>
/**
* Returns true if `sid` is `root` or transitively descends from `root` via
* `parentID`. Walks the parent chain up to `opts.maxDepth` (default 64) and
* stops at any `NotFoundError` (returns false). When `opts.cache` is
* provided, it is used both as a positive-hit short-circuit and as an
* accumulator: every confirmed descendant in the walked chain is added to
* the set. The cache is auto-seeded with `root` on every call, so callers
* can pass a fresh `new Set()` and reuse it across calls without seeding.
*
* Cache reuse is only safe within a single root. As a partial guard, if a
* non-empty cache is passed that does not already contain `root`, the call
* dies — that case can only arise from a cache leaked from another lineage.
* The guard does NOT catch caches that have been mixed (root present plus
* entries from a different root); detecting that would require tagging the
* cache. Each `SessionAutoReply` instance owns its own cache, which is the
* intended usage. If you have a use case that needs disjoint roots to
* share a cache, build a `Map<root, Set<SessionID>>` outside this helper
* and pass the per-root inner Set.
*/
readonly isDescendantOf: (
sid: SessionID,
root: SessionID,
opts?: { maxDepth?: number; cache?: Set<SessionID> },
) => Effect.Effect<boolean>
}

export class Service extends Context.Service<Service, Interface>()("@opencode/Session") {}
Expand Down Expand Up @@ -671,6 +695,54 @@ export const layer: Layer.Layer<Service, never, Bus.Service | Storage.Service> =
return Option.none<MessageV2.WithParts>()
})

const isDescendantOf = Effect.fn("Session.isDescendantOf")(function* (
sid: SessionID,
root: SessionID,
opts?: { maxDepth?: number; cache?: Set<SessionID> },
) {
const maxDepth = opts?.maxDepth ?? 64
const known = opts?.cache ?? new Set<SessionID>()
// Defend against accidental cache sharing across different roots: if a
// caller passes a non-empty cache that lacks `root`, those entries were
// populated under a different lineage and `known.has(sid)` could
// short-circuit to true with the wrong answer. Fail loudly instead of
// returning a silently wrong result.
if (known.size > 0 && !known.has(root)) {
return yield* Effect.die(
new Error(
`Session.isDescendantOf: opts.cache appears to belong to a different root (size=${known.size}, missing root=${root}). Caches must not be shared across roots.`,
),
)
}
// Always seed `root` into the working set so the parent walk has a
// termination anchor and the next cross-root reuse check still works.
known.add(root)
if (sid === root) return true
if (known.has(sid)) return true
// Walk parent chain. Track the chain so we can promote every visited
Comment on lines +703 to +722
Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in f54a1ad. At call entry, if opts.cache is non-empty AND does not contain root, isDescendantOf now dies with a clear error naming the expected root and the cache size. New regression test 'dies if cache is non-empty but missing root' pins the guard. JSDoc updated. Picked option (a) over (b) because Map<root,Set> changes the API for the AutoReply use case where a single cache is correct, while the guard catches the actual bug class without complicating the simple case.

// node into the cache once we hit a known descendant — turns repeated
// calls against the same lineage into O(1) after the first walk.
const chain: SessionID[] = []
let cur: SessionID | undefined = sid
let depth = 0
while (cur !== undefined && !known.has(cur) && depth < maxDepth) {
chain.push(cur)
depth++
const lookup: Option.Option<Info> = yield* get(cur).pipe(
Effect.option,
Effect.catchDefect((defect) => {
if (!NotFoundError.isInstance(defect)) return Effect.die(defect)
return Effect.succeed(Option.none<Info>())
}),
)
if (Option.isNone(lookup)) break
cur = lookup.value.parentID ?? undefined
}
if (cur === undefined || !known.has(cur)) return false
chain.forEach((item) => known.add(item))
return true
})

return Service.of({
create,
fork,
Expand All @@ -693,6 +765,7 @@ export const layer: Layer.Layer<Service, never, Bus.Service | Storage.Service> =
getPart,
updatePartDelta,
findMessage,
isDescendantOf,
})
}),
)
Expand Down
163 changes: 162 additions & 1 deletion packages/opencode/test/session/session.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { Bus } from "../../src/bus"
import { Log } from "../../src/util"
import { Instance } from "../../src/project/instance"
import { MessageV2 } from "../../src/session/message-v2"
import { MessageID, PartID, type SessionID } from "../../src/session/schema"
import { MessageID, PartID, SessionID } from "../../src/session/schema"
import { AppRuntime } from "../../src/effect/app-runtime"
import { tmpdir } from "../fixture/fixture"

Expand Down Expand Up @@ -179,3 +179,164 @@ describe("Session", () => {
expect(missing).toBe(true)
})
})

function isDescendantOf(
sid: SessionID,
root: SessionID,
opts?: { maxDepth?: number; cache?: Set<SessionID> },
) {
return AppRuntime.runPromise(SessionNs.Service.use((svc) => svc.isDescendantOf(sid, root, opts)))
}

describe("Session.isDescendantOf", () => {
test("identity: a session is its own descendant", async () => {
await using tmp = await tmpdir({ git: true })
await Instance.provide({
directory: tmp.path,
fn: async () => {
const root = await create({ title: "root" })
expect(await isDescendantOf(root.id, root.id)).toBe(true)
await remove(root.id)
},
})
})

test("direct child is a descendant", async () => {
await using tmp = await tmpdir({ git: true })
await Instance.provide({
directory: tmp.path,
fn: async () => {
const root = await create({ title: "root" })
const child = await create({ title: "child", parentID: root.id })
expect(await isDescendantOf(child.id, root.id)).toBe(true)
await remove(root.id)
},
})
})

test("grandchild is a descendant", async () => {
await using tmp = await tmpdir({ git: true })
await Instance.provide({
directory: tmp.path,
fn: async () => {
const root = await create({ title: "root" })
const child = await create({ title: "child", parentID: root.id })
const grandchild = await create({ title: "grandchild", parentID: child.id })
expect(await isDescendantOf(grandchild.id, root.id)).toBe(true)
await remove(root.id)
},
})
})

test("unrelated session is not a descendant", async () => {
await using tmp = await tmpdir({ git: true })
await Instance.provide({
directory: tmp.path,
fn: async () => {
const rootA = await create({ title: "rootA" })
const rootB = await create({ title: "rootB" })
expect(await isDescendantOf(rootB.id, rootA.id)).toBe(false)
await remove(rootA.id)
await remove(rootB.id)
},
})
})

test("non-existent session id is not a descendant", async () => {
await using tmp = await tmpdir({ git: true })
await Instance.provide({
directory: tmp.path,
fn: async () => {
const root = await create({ title: "root" })
const ghost = SessionID.make("ses_ghost_does_not_exist_00000")
expect(await isDescendantOf(ghost, root.id)).toBe(false)
await remove(root.id)
},
})
})

test("respects maxDepth: returns false when chain is deeper than the limit", async () => {
await using tmp = await tmpdir({ git: true })
await Instance.provide({
directory: tmp.path,
fn: async () => {
const root = await create({ title: "root" })
// Build a chain root → c1 → c2 → c3.
const c1 = await create({ title: "c1", parentID: root.id })
const c2 = await create({ title: "c2", parentID: c1.id })
const c3 = await create({ title: "c3", parentID: c2.id })
// maxDepth: 1 means we walk at most one parent edge from c3, which
// reaches c2 (not yet known), so we cannot confirm root and must
// return false. With the default depth (64), c3 is reachable.
expect(await isDescendantOf(c3.id, root.id, { maxDepth: 1 })).toBe(false)
expect(await isDescendantOf(c3.id, root.id)).toBe(true)
await remove(root.id)
},
})
})

test("cache accumulates confirmed descendants across calls", async () => {
await using tmp = await tmpdir({ git: true })
await Instance.provide({
directory: tmp.path,
fn: async () => {
const root = await create({ title: "root" })
const c1 = await create({ title: "c1", parentID: root.id })
const c2 = await create({ title: "c2", parentID: c1.id })
const cache = new Set<SessionID>([root.id])
// First call walks the full chain c2 → c1 → root and promotes both
// intermediate nodes into the cache.
expect(await isDescendantOf(c2.id, root.id, { cache })).toBe(true)
expect(cache.has(c1.id)).toBe(true)
expect(cache.has(c2.id)).toBe(true)
// A subsequent call for c1 short-circuits via the cache (depth: 0
// forbids any walk; cache hit alone must satisfy the call).
expect(await isDescendantOf(c1.id, root.id, { cache, maxDepth: 0 })).toBe(true)
await remove(root.id)
},
})
})

test("auto-seeds root into cache so callers can pass an empty Set", async () => {
await using tmp = await tmpdir({ git: true })
await Instance.provide({
directory: tmp.path,
fn: async () => {
const root = await create({ title: "root" })
const child = await create({ title: "child", parentID: root.id })
// Caller passes a fresh empty Set — root must still be reachable.
// Without the auto-seed in isDescendantOf this would walk past root
// (since `known.has(root)` would be false), bottom out at the
// not-found parent of root, and return false.
const cache = new Set<SessionID>()
expect(await isDescendantOf(child.id, root.id, { cache })).toBe(true)
expect(cache.has(root.id)).toBe(true)
await remove(root.id)
},
})
})

test("dies if cache is non-empty but missing root (cross-root reuse guard)", async () => {
await using tmp = await tmpdir({ git: true })
await Instance.provide({
directory: tmp.path,
fn: async () => {
const rootA = await create({ title: "rootA" })
const rootB = await create({ title: "rootB" })
const childA = await create({ title: "childA", parentID: rootA.id })
// Populate a cache under rootA, then incorrectly try to reuse it
// against rootB. The guard must reject this loudly.
const cache = new Set<SessionID>()
expect(await isDescendantOf(childA.id, rootA.id, { cache })).toBe(true)
// cache now contains {rootA, childA} but not rootB.
let died = false
await isDescendantOf(rootA.id, rootB.id, { cache }).catch(() => {
died = true
})
expect(died).toBe(true)
await remove(rootA.id)
await remove(rootB.id)
},
})
})
})
Loading