From 13a91d15278c1a4f485145912750879e0828fc1b Mon Sep 17 00:00:00 2001 From: ditadi Date: Mon, 11 May 2026 23:17:45 +0100 Subject: [PATCH] feat(appkit): executeTask, TypedTaskContext, SSE bridge, step MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The public AppKit-side surface for TaskFlow durable execution. Plugin authors can now register typed durable tasks (`TaskDefinition`), bridge them to SSE via `this.executeTask(res, name, input, settings?)`, use `step()` for replay-safe checkpoints, and route OBO identity automatically through `runInUserContext` + `getCurrentUserContext`. Public surface added under `packages/appkit/src/taskflow/`: - `TypedTaskContext` — narrows `ctx.emit(name, payload)` to a declared event-name → payload-shape map so the SSE wire shape is typed end-to-end. - `TaskDefinition` and `TaskHandleRef`/`TaskRef` — the registration object accepted by `taskflow.task(...)` and the branded handle it returns. Phantom generic parameters propagate to `executeTask` so input / event maps flow without redundant generics. - `ExecuteTaskSettings` — strictly disjoint from `PluginExecutionSettings`. `retry`, `cache`, `timeout`, `stream`, and `userId` are typed as `never` so any caller copying a settings object from `execute()` / `executeStream()` gets a clear compile-time error. TaskFlow handles those concerns natively (smart recovery, IK dedup, cooperative `stop()`). - `step(fn)` / `step("name", fn)` — workflow primitive with deferred binding (resolves the engine wrapper on first invocation so plugin authors can declare `const x = step(...)` at module scope) and an explicit guard against anonymous-arrow WAL key collisions. - `setupSseHeaders`, `writeSseFrame`, `writeSseComment`, `RESERVED_BRIDGE_EVENT_NAMES`, `TASKFLOW_IK_HEADER` — SSE wire helpers with a CRLF guard on event names, multi-line \`data:\` framing, and \`Vary: Origin\` for cache safety. - `userContextFromTaskCtx(ctx)` — reads the OBO `UserContext` from the FFI sidecar in `ctx.context`, with a discriminator check (`isUserContext`) so a stale or wrong-shape payload returns null instead of being trusted. `executeTask` (lives in `taskflow/execute-task.ts`) implements the durable bridge: - Derives identity from `getCurrentUserContext()` only (never from request headers or settings) — `this.asUser(req).executeTask(...)` yields OBO, the bare call yields SP. - Submits via `taskflow.start(...)` with the engine sidecar carrying the OBO `UserContext`, then bridges the WAL stream to SSE with `Last-Event-ID` replay support clamped to `[0, MAX_SAFE_INTEGER]`. - Drops `custom:step:*` events (WAL-only checkpoints), translates `heartbeat` to SSE comments, and routes terminal events (`completed` / `failed` / `cancelled`) before closing. - Reserved-name guard: plugin emissions whose name collides with the bridge wire vocabulary (`ready`, `completed`, `failed`, `error`, …) are dropped with `logger.warn`, never closing the stream. - BigInt-safe JSON replacer (warehouse payloads round-trip cleanly). - `cancelOnDisconnect` (default `true`) + `disconnectGraceMs` (default 5 s) — short reconnects don't kill the durable run. - Production-safe error path: post-headers errors mask the message when `NODE_ENV=production`. `TaskflowService` extensions in `taskflow/index.ts`: - `task(def)` — typed registration with a hard-error on duplicate registration in production (silent shadowing was the prior failure mode); HMR warning in dev. - `hasTask(name)` / `getRegistration(name)` — used by `executeTask` to surface OBO-misconfiguration diagnostics. - `_registerBridge(bridge)` — bookkeeping for active SSE bridges so `shutdown()` can drain them with an explicit `event: error` / `server_shutting_down` frame *before* the engine closes iterators (otherwise the client sees mid-stream EOF). Context layer: - `getCurrentUserContext()` — public accessor for the active OBO `UserContext` (sugar for the same AsyncLocalStorage slot `isInUserContext()` already reads). Plugin layer: - `Plugin.executeTask(res, task, input, settings?)` — typed entry point. `task` accepts either a registered name (`string`) or a branded `TaskRef`. The method is deliberately NOT in `EXCLUDED_FROM_PROXY` so OBO routing via the `asUser` proxy works uniformly with `execute` / `executeStream`. Tests scaffolding: - `tools/test-helpers.ts` — `createStubTaskflowService` (in-process fake of the TaskFlow surface for unit tests; consumed by PR 5's analytics tests) and a robust `parseSSEResponse` upgrade (multi-line `data:` joining per the SSE spec, CRLF normalisation, comment skipping, `eventType` filtering). Verify: - `pnpm -r typecheck`, `pnpm build`, `pnpm test` (122 files, 2279 tests) all green. - `pnpm exec knip` clean (no unused exports or types). - `pnpm exec biome check` clean on touched files. Not in this PR. No plugin uses `executeTask` yet — analytics migration is PR 5. No demo plugin — that's PR 6. No docs rewrite — that's PR 7. Stacked on: stack/taskflow/taskflow-service (#376). Signed-off-by: Victor Ditadi Signed-off-by: ditadi --- .../appkit/src/context/execution-context.ts | 17 +- packages/appkit/src/context/index.ts | 1 + packages/appkit/src/index.ts | 23 + packages/appkit/src/plugin/plugin.ts | 67 ++- packages/appkit/src/taskflow/execute-task.ts | 328 ++++++++++++ packages/appkit/src/taskflow/index.ts | 499 +++++++++++++++--- packages/appkit/src/taskflow/sse.ts | 127 +++++ tools/test-helpers.ts | 268 +++++++++- 8 files changed, 1235 insertions(+), 95 deletions(-) create mode 100644 packages/appkit/src/taskflow/execute-task.ts create mode 100644 packages/appkit/src/taskflow/sse.ts diff --git a/packages/appkit/src/context/execution-context.ts b/packages/appkit/src/context/execution-context.ts index d707f52de..5aad82d54 100644 --- a/packages/appkit/src/context/execution-context.ts +++ b/packages/appkit/src/context/execution-context.ts @@ -83,9 +83,20 @@ export function getWorkspaceId(): Promise { } /** - * Check if currently running in a user context. + * Check if currently running in a user context. Sugar for + * `getCurrentUserContext() !== null` — prefer {@link getCurrentUserContext} + * when you also need the value. */ export function isInUserContext(): boolean { - const ctx = executionContextStorage.getStore(); - return ctx !== undefined; + return getCurrentUserContext() !== null; +} + +/** + * Returns the active {@link UserContext} when inside a + * `runInUserContext` scope (set by `plugin.asUser(req).method(...)`), + * or `null`. Use to forward user identity to a downstream layer + * (e.g. spawning a durable task that should run as the caller). + */ +export function getCurrentUserContext(): UserContext | null { + return executionContextStorage.getStore() ?? null; } diff --git a/packages/appkit/src/context/index.ts b/packages/appkit/src/context/index.ts index d306d359e..7470ca367 100644 --- a/packages/appkit/src/context/index.ts +++ b/packages/appkit/src/context/index.ts @@ -1,4 +1,5 @@ export { + getCurrentUserContext, getCurrentUserId, getExecutionContext, getWarehouseId, diff --git a/packages/appkit/src/index.ts b/packages/appkit/src/index.ts index 00fd6ff86..17203146f 100644 --- a/packages/appkit/src/index.ts +++ b/packages/appkit/src/index.ts @@ -96,6 +96,29 @@ export { ResourceRegistry, ResourceType, } from "./registry"; +// TaskFlow durable execution. `TaskflowService` is exported type-only: +// it's constructed by `createApp` and addressed via `this.taskflow` +// inside plugins; internal statics must not leak to consumers. +export { + type ExecuteTaskSettings, + type ResumeOptions, + type SseEvent, + type StopOptions, + type StreamEvent, + type SubmitOptions, + setupSseHeaders, + step, + TASKFLOW_IK_HEADER, + type Task, + type TaskContext, + type TaskDefinition, + type TaskEvent, + type TaskflowConfig, + type TaskflowService, + type TaskHandle, + type TypedTaskContext, + writeSseFrame, +} from "./taskflow"; // Telemetry (for advanced custom telemetry) export { type Counter, diff --git a/packages/appkit/src/plugin/plugin.ts b/packages/appkit/src/plugin/plugin.ts index effa3f389..7a2d53db0 100644 --- a/packages/appkit/src/plugin/plugin.ts +++ b/packages/appkit/src/plugin/plugin.ts @@ -24,7 +24,12 @@ import type { PluginContext } from "../core/plugin-context"; import { AppKitError, AuthenticationError } from "../errors"; import { createLogger } from "../logging/logger"; import { StreamManager } from "../stream"; -import { TaskflowService } from "../taskflow"; +import { + type ExecuteTaskSettings, + TaskflowService, + type TaskRef, +} from "../taskflow"; +import { executeTask as executeTaskImpl } from "../taskflow/execute-task"; import { type ITelemetry, normalizeTelemetryOptions, @@ -74,11 +79,14 @@ function hasHttpStatusCode( /** * Methods that should not be proxied by asUser(). - * These are lifecycle/internal methods that don't make sense - * to execute in a user context. + * Lifecycle/internal methods that don't make sense in a user context. + * + * Note: `executeTask` is deliberately NOT excluded — it MUST run inside + * the proxy's `runInUserContext` so `getCurrentUserContext()` forwards + * the OBO context to the engine. Excluding it would silently downgrade + * OBO calls to SP. */ const EXCLUDED_FROM_PROXY = new Set([ - // Lifecycle methods "setup", "shutdown", "attachContext", @@ -87,12 +95,10 @@ const EXCLUDED_FROM_PROXY = new Set([ "getSkipBodyParsingPaths", "abortActiveOperations", "clientConfig", - // asUser itself - prevent chaining like .asUser().asUser() + // Prevent chained .asUser().asUser(). "asUser", - // Internal methods "constructor", - // Synchronous accessor for the shared singleton — no need for an - // extra async-context frame. + // Synchronous singleton accessor; no need for an async-context frame. "requireTaskflow", ]); @@ -560,6 +566,51 @@ export abstract class Plugin< ); } + /** + * Bridges a registered durable task to an SSE response. + * + * Submits via `taskflow.start(...)`, subscribes, writes each event as + * `id: \nevent: \ndata: `. Supports `Last-Event-ID` + * reconnect from the WAL. + * + * Identity comes from the active `runInUserContext` scope. For OBO, + * call through the proxy: `this.asUser(req).executeTask(...)`. + * + * Unlike `execute()` / `executeStream()`, this does not accept + * `retry` / `cache` / `timeout` — TaskFlow handles them natively. + * + * @example + * ```ts + * // SP + * await this.executeTask(res, "agent-loop", req.body); + * + * // OBO, no cancel on reconnect + * await this.asUser(req).executeTask(res, "agent-loop", req.body, { + * cancelOnDisconnect: false, + * }); + * ``` + */ + protected async executeTask< + TInput = unknown, + TResult = unknown, + TEvents extends Record = Record, + >( + res: express.Response, + task: string | TaskRef, + input: TInput, + settings?: ExecuteTaskSettings, + ): Promise { + const taskflow = this.requireTaskflow(); + const taskName = typeof task === "string" ? task : task.name; + return executeTaskImpl( + { taskflow, telemetry: this.telemetry, pluginName: this.name }, + res, + taskName, + input, + settings, + ); + } + /** * Execute a function with the plugin's interceptor chain. * diff --git a/packages/appkit/src/taskflow/execute-task.ts b/packages/appkit/src/taskflow/execute-task.ts new file mode 100644 index 000000000..04703219c --- /dev/null +++ b/packages/appkit/src/taskflow/execute-task.ts @@ -0,0 +1,328 @@ +/** + * `executeTask` — durable POST + SSE bridge in one call. + * + * Wire shape: each `ctx.emit(name, payload)` becomes + * `event: ` / `data: `. Bridge-handled: + * `heartbeat` → SSE comment, `custom:step:*` → dropped (WAL-only), + * `completed`/`failed`/`cancelled` → forwarded, then loop exits. + * + * Identity comes from the active `runInUserContext` scope set by the + * `asUser(req)` proxy — never from headers or settings. + */ + +import { SpanStatusCode } from "@opentelemetry/api"; +import type express from "express"; +import { getCurrentUserContext } from "../context"; +import { createLogger } from "../logging/logger"; +import type { ITelemetry } from "../telemetry"; +import type { + ActiveBridge, + ExecuteTaskSettings, + TaskflowService, +} from "./index"; +import { + RESERVED_BRIDGE_EVENT_NAMES, + setupSseHeaders, + writeSseComment, + writeSseFrame, +} from "./sse"; + +/** + * Response header carrying the engine-derived idempotency key + * (`sha256(name || canon(input) || userId)`) so clients can issue + * follow-up `/resume/:ik` / `/stop/:ik` without rebuilding the input. + * Mirrored as the first SSE event (`event: ready`) for cross-origin + * clients that can't read response headers. + * + * @public + */ +export const TASKFLOW_IK_HEADER = "X-Taskflow-Idempotency-Key"; + +const logger = createLogger("taskflow:execute-task"); + +/** Process-scoped to log the OBO+autoRecover warning once per (plugin, task). @internal */ +const oboAutoRecoverWarned = new Set(); + +export async function executeTask( + deps: { + taskflow: TaskflowService; + telemetry: ITelemetry; + pluginName: string; + }, + res: express.Response, + taskName: string, + input: TInput, + settings: ExecuteTaskSettings = {}, +): Promise { + const { taskflow, telemetry, pluginName } = deps; + + // Identity comes only from the active `runInUserContext` scope. A + // settings-based `userId` override would let any caller forge + // ownership of `taskflow.stop()` / `resume()` for another user's IK. + const userCtx = getCurrentUserContext(); + const userId = userCtx?.userId; + const isObo = userCtx !== null; + + // OBO + autoRecover is incompatible: the recovery worker has no + // UserContext, so post-restart OBO calls fail. Warn once per + // (plugin, task) so a high-traffic misconfigured task doesn't flood. + if (isObo) { + const reg = taskflow.getRegistration(taskName); + const warningKey = `${pluginName}:${taskName}`; + if (reg?.autoRecover && !oboAutoRecoverWarned.has(warningKey)) { + oboAutoRecoverWarned.add(warningKey); + logger.warn( + `Plugin "${pluginName}" registered OBO task "${taskName}" with autoRecover=true. ` + + "After restart, recovery runs without the original UserContext and OBO calls " + + "will fail. Register with `autoRecover: false` and call `taskflow.resume()` " + + "from a fresh authenticated request.", + ); + } + } + + const wantTraces = settings.telemetry?.traces ?? true; + // Spans: `taskflow..` (parent, submit + subscribe) and + // `taskflow.start` (submit only). Subscribe stays under the parent — + // a per-event child span would be expensive for no extra signal. + const tracer = wantTraces ? telemetry.getTracer() : null; + const span = tracer?.startSpan(`taskflow.${pluginName}.${taskName}`, { + attributes: { + "taskflow.task_name": taskName, + "taskflow.plugin_name": pluginName, + "taskflow.obo": isObo, + "taskflow.execute_mode": settings.executeMode ?? "at_least_once", + }, + }); + + let idempotencyKey: string; + // Hoisted so the outer `finally` can release the bridge regardless + // of which branch unwinds. + let unregisterBridge: (() => void) | null = null; + try { + // `context` is the live executor sidecar — never serialised, never + // seen by recovery. Lets the task body re-enter `runInUserContext` + // without re-parsing headers. + const startSpan = tracer?.startSpan("taskflow.start", { + attributes: { + "taskflow.task_name": taskName, + "taskflow.execute_mode": settings.executeMode ?? "at_least_once", + }, + }); + try { + const handle = await taskflow.start(taskName, input, { + userId, + context: userCtx ?? undefined, + executeMode: settings.executeMode, + }); + idempotencyKey = handle.idempotencyKey; + startSpan?.setAttribute("taskflow.idempotency_key", idempotencyKey); + startSpan?.setStatus({ code: SpanStatusCode.OK }); + } catch (err) { + const message = + (err as { message?: string } | undefined)?.message ?? + "task start failed"; + startSpan?.setStatus({ code: SpanStatusCode.ERROR, message }); + span?.setStatus({ code: SpanStatusCode.ERROR, message }); + throw err; + } finally { + startSpan?.end(); + } + + span?.setAttribute("taskflow.idempotency_key", idempotencyKey); + + const sseRequest = res.req as express.Request | undefined; + const lastEventId = parseLastEventId(sseRequest); + + if (!res.headersSent) { + // Dual surface: header for same-origin fetch, `ready` event for + // cross-origin EventSource (which can't read headers). + res.setHeader(TASKFLOW_IK_HEADER, idempotencyKey); + setupSseHeaders(res); + writeSseFrame(res, { + event: "ready", + data: JSON.stringify({ idempotencyKey }), + }); + } + + const cancelOnDisconnect = settings.cancelOnDisconnect ?? true; + const disconnectGraceMs = Math.max(0, settings.disconnectGraceMs ?? 5000); + let clientClosed = false; + sseRequest?.once?.("close", () => { + clientClosed = true; + if (!cancelOnDisconnect) return; + // Grace window so a "wifi blipped for 2 s" reconnect doesn't + // durably suspend the task. + setTimeout(() => { + taskflow + .stop(idempotencyKey, { + reason: "client_disconnected", + userId, + }) + .catch((err: unknown) => { + logger.debug( + `taskflow.stop after client disconnect failed for ${idempotencyKey}: %O`, + err, + ); + }); + }, disconnectGraceMs).unref?.(); + }); + + // Register so the service can write a final `event: error` frame + // on shutdown before the engine closes the iterator. + const bridge: ActiveBridge = { + idempotencyKey, + drain: (reason) => { + clientClosed = true; + if (res.writableEnded) return; + try { + writeSseFrame(res, { + event: "error", + data: JSON.stringify({ message: reason }), + }); + } catch { + // Response may already be in a bad state. + } + if (!res.writableEnded) res.end(); + }, + }; + unregisterBridge = taskflow._registerBridge(bridge); + + for await (const streamEvent of taskflow.subscribe( + idempotencyKey, + lastEventId, + )) { + if (clientClosed || res.writableEnded) break; + + const rawType = streamEvent.event.eventType; + + // Heartbeats are wire-level keep-alives — emit as SSE comment so + // proxies don't drop idle sockets without leaking to the client. + if (rawType === "heartbeat") { + writeSseComment(res, "hb"); + continue; + } + + // `step:*` checkpoints are WAL-only (consumed by `step()` on + // recovery). The `step:` prefix is reserved — a plugin emitting + // `ctx.emit("step:foo", ...)` will be filtered here too. + if (typeof rawType === "string" && rawType.startsWith("custom:step:")) { + continue; + } + + // Engine prefixes user events with `custom:`; strip for the wire. + const isCustom = + typeof rawType === "string" && rawType.startsWith("custom:"); + const strippedName = isCustom + ? rawType.slice("custom:".length) + : (rawType ?? "message"); + + // Reserved-name guard: a plugin emitting `completed` would close + // the EventSource on the client while the engine keeps publishing. + // Engine-emitted reserved events (`isCustom === false`) pass. + if (isCustom && RESERVED_BRIDGE_EVENT_NAMES.has(strippedName)) { + logger.warn( + `Plugin "${pluginName}" task "${taskName}" emitted reserved event ` + + `name "${strippedName}" — refusing to forward.`, + ); + continue; + } + const eventName = strippedName; + + let data: string; + try { + // `bigintReplacer`: warehouse `LONG`/`BIGINT` columns surface as + // JS `BigInt`, which `JSON.stringify` rejects with `TypeError`. + data = JSON.stringify(streamEvent.event.payload ?? {}, bigintReplacer); + } catch (err) { + logger.warn( + `Failed to serialise event payload for "${taskName}" (event=${rawType}): %O`, + err, + ); + data = "{}"; + } + + try { + // `streamSeq` as SSE `id:` so `Last-Event-ID` reconnects resume + // from the WAL via `taskflow.subscribe(ik, lastSeq)`. + writeSseFrame(res, { + id: streamEvent.streamSeq, + event: eventName, + data, + }); + } catch (err) { + logger.debug("SSE write failed; closing stream", err); + break; + } + + if ( + rawType === "completed" || + rawType === "failed" || + rawType === "cancelled" + ) { + break; + } + } + + if (!res.writableEnded) res.end(); + span?.setStatus({ code: SpanStatusCode.OK }); + } catch (err) { + span?.setStatus({ + code: SpanStatusCode.ERROR, + message: + (err as { message?: string } | undefined)?.message ?? + "executeTask failed", + }); + if (!res.headersSent) { + const isProd = process.env.NODE_ENV === "production"; + const message = isProd + ? "Server error" + : ((err as { message?: string } | undefined)?.message ?? + "executeTask failed"); + res.status(500).json({ error: message }); + } else if (!res.writableEnded) { + // In-band SSE error. Redact the message in production so handler + // exceptions (stacks, paths, secrets) don't reach the wire. + try { + const isProd = process.env.NODE_ENV === "production"; + const message = isProd + ? "Server error" + : ((err as { message?: string } | undefined)?.message ?? ""); + writeSseFrame(res, { + event: "error", + data: JSON.stringify({ message }), + }); + } catch { + // Ignore. + } + res.end(); + } + logger.error( + `executeTask("${taskName}") failed for plugin "${pluginName}": %O`, + err, + ); + throw err; + } finally { + unregisterBridge?.(); + span?.end(); + } +} + +function parseLastEventId( + req: express.Request | undefined, +): number | undefined { + const raw = req?.header?.("last-event-id") ?? req?.header?.("Last-Event-ID"); + if (!raw) return undefined; + const parsed = parseInt(String(raw), 10); + if (!Number.isFinite(parsed)) return undefined; + // Clamp: negative values rewind past the WAL retention prefix + // (used to infinite-loop on subscribe), and engine seq comparisons + // lose precision past 2^53. Engine still validates against its own + // retention window — this just stops the obvious abuse at the FFI. + if (parsed < 0 || parsed > Number.MAX_SAFE_INTEGER) return undefined; + return parsed; +} + +/** Serialises `BigInt` as string. Warehouse `LONG`/`BIGINT` columns. */ +function bigintReplacer(_key: string, value: unknown): unknown { + return typeof value === "bigint" ? value.toString() : value; +} diff --git a/packages/appkit/src/taskflow/index.ts b/packages/appkit/src/taskflow/index.ts index 2971b9dff..5e1e256f3 100644 --- a/packages/appkit/src/taskflow/index.ts +++ b/packages/appkit/src/taskflow/index.ts @@ -1,9 +1,7 @@ /** - * TaskflowService — durable execution core service. - * - * Wraps the vendored TaskFlow Node.js bindings (Rust + napi). Booted by - * `createApp` and exposed to plugins as `this.taskflow`. Default storage - * is SQLite at `.appkit/taskflow/tasks.db`; opt out with + * TaskflowService — durable execution. Wraps the vendored TaskFlow + * Rust+napi binding. Booted by `createApp`, exposed as `this.taskflow`. + * Default storage: SQLite at `.appkit/taskflow/tasks.db`. Opt out with * `createApp({ taskflow: false })`. */ @@ -16,25 +14,27 @@ import type { StreamEvent, SubmitOptions, Task, + TaskContext, TaskEvent, TaskflowConfig, Engine as TaskflowEngine, TaskHandle, } from "../../vendor/taskflow/taskflow.js"; +import type { UserContext } from "../context"; import { InitializationError } from "../errors"; import { createLogger } from "../logging/logger"; +import { type SseEvent, setupSseHeaders, writeSseFrame } from "./sse"; const logger = createLogger("taskflow"); -/** Type-only import keeps `import "@databricks/appkit"` from touching the native binary. */ +/** Type-only — avoids loading the native binary on `import`. */ type VendorModule = typeof import("../../vendor/taskflow/taskflow.js"); let cachedVendor: VendorModule | null = null; /** - * Lazy-loads the vendored binary. The artifact only ships for - * `darwin-arm64` and `linux-x64`; deferring the load keeps the SDK - * importable on other platforms when the caller opts out. + * Lazy-loads the vendored binary so the SDK stays importable on + * platforms without a published artifact when the caller opts out. * @internal */ async function loadVendorModule(): Promise { @@ -60,10 +60,9 @@ async function loadVendorModule(): Promise { } /** - * Verifies the platform `.node` and JS loader against `VENDOR.json` and - * throws before the loader executes on mismatch. Opt-in via - * `APPKIT_VERIFY_TASKFLOW_VENDOR=1`. `taskflow.d.ts` is intentionally - * not checked (types-only, never executed). + * Verifies the platform `.node` + JS loader against `VENDOR.json`, + * throwing before the loader executes on mismatch. Opt-in via + * `APPKIT_VERIFY_TASKFLOW_VENDOR=1`. `taskflow.d.ts` is types-only. * @internal */ async function verifyVendorIntegrity(): Promise { @@ -117,11 +116,7 @@ async function verifyVendorIntegrity(): Promise { } } -/** - * Default config. On-disk state lives under `.appkit/taskflow/` so the - * footprint is obvious; `enableTestMode: false` keeps `simulateCrash` - * behind an explicit opt-in. - */ +/** On-disk state under `.appkit/taskflow/`; `simulateCrash` is opt-in. */ const APPKIT_DEFAULTS: TaskflowConfig = { engine: { walPath: ".appkit/taskflow/wal", @@ -139,10 +134,9 @@ const APPKIT_DEFAULTS: TaskflowConfig = { }; /** - * Warns when SQLite is paired with a Databricks Apps environment — - * the per-pod filesystem cannot survive rolling restarts, so durability - * silently degrades. Can't refuse to boot since single-process dev - * looks identical at the config level. + * Warns when SQLite is paired with Databricks Apps — per-pod filesystem + * cannot survive rolling restarts, so durability silently degrades. + * Can't refuse to boot: single-process dev looks identical at config. * @internal */ function warnOnEphemeralStorage(config: TaskflowConfig): void { @@ -157,17 +151,15 @@ function warnOnEphemeralStorage(config: TaskflowConfig): void { "TaskFlow is configured with the SQLite backend but the runtime " + "appears to be Databricks Apps (multi-pod, no shared volume). " + "Tasks will not survive rolling restarts. For production, switch " + - "the backend to `lakebase` (Postgres) by passing " + - "`taskflow: { storage: { backend: 'lakebase', connectionString: … } }` " + - "to `createApp(...)`.", + "to `lakebase` via " + + "`taskflow: { storage: { backend: 'lakebase', connectionString: … } }`.", ); } /** - * Merges user config over defaults. `engine`/`executor` merge one level - * deep; `storage` is a discriminated union and replaces wholesale. - * `wal`/`admission`/`stream` are only emitted when present — the FFI - * distinguishes "key absent" from "key present with no fields". + * `engine`/`executor` merge one level deep; `storage` is a discriminated + * union and replaces wholesale; `wal`/`admission`/`stream` are only + * emitted when present (FFI distinguishes absent vs present-empty). */ function mergeAppkitDefaults(user: TaskflowConfig | undefined): TaskflowConfig { if (!user) return APPKIT_DEFAULTS; @@ -185,38 +177,312 @@ function mergeAppkitDefaults(user: TaskflowConfig | undefined): TaskflowConfig { } /** - * Engine types re-exported as the AppKit-side surface for `this.taskflow`. + * Wraps an async function as an idempotent WAL-keyed checkpoint inside a + * durable task. On recovery, completed steps short-circuit to the cached + * result instead of re-executing — use this for expensive / unsafe-to- + * replay stages (LLM calls, large queries, external I/O writes). + * + * **Naming matters**: the WAL key is `Function.name`, so anonymous + * arrows all collide on `""`. `step(fn)` requires a non-empty + * `.name` (named declaration or `const x = step(...)` assignment); + * `step("name", fn)` overrides it explicitly. + * + * Lazily resolves the engine primitive on first invocation so module- + * scope `const x = step(...)` works before `createApp` has booted. + * + * @example + * ```ts + * const fetchInvoices = step("fetch-invoices", async (ctx, id: string) => + * invoiceClient.list({ accountId: id }), + * ); + * ``` + * + * @public + */ +export function step( + name: string, + fn: (ctx: TaskContext, ...args: TArgs) => Promise, +): (ctx: TaskContext, ...args: TArgs) => Promise; +export function step( + fn: (ctx: TaskContext, ...args: TArgs) => Promise, +): (ctx: TaskContext, ...args: TArgs) => Promise; +export function step( + nameOrFn: string | ((ctx: TaskContext, ...args: TArgs) => Promise), + maybeFn?: (ctx: TaskContext, ...args: TArgs) => Promise, +): (ctx: TaskContext, ...args: TArgs) => Promise { + let stepName: string; + let target: (ctx: TaskContext, ...args: TArgs) => Promise; + if (typeof nameOrFn === "string") { + if (!nameOrFn) { + throw new Error("step(name, fn): name must be non-empty."); + } + if (typeof maybeFn !== "function") { + throw new Error("step(name, fn): missing function argument."); + } + stepName = nameOrFn; + const renamed = (...args: Parameters) => maybeFn(...args); + Object.defineProperty(renamed, "name", { value: stepName }); + target = renamed as typeof maybeFn; + } else { + if (!nameOrFn.name) { + throw new Error( + 'step(fn): wrapped function has empty `.name` — would collide with other anonymous steps in the WAL. Use `const x = step(...)` or pass an explicit name via `step("my-step", fn)`.', + ); + } + stepName = nameOrFn.name; + target = nameOrFn; + } + + let memoized: + | ((ctx: TaskContext, ...args: TArgs) => Promise) + | null = null; + const trampoline = (ctx: TaskContext, ...args: TArgs): Promise => { + if (!memoized) { + if (!cachedVendor) { + throw InitializationError.notInitialized( + "TaskflowService", + `step("${stepName}") ran before TaskFlow initialised — call it inside a registered task body.`, + ); + } + memoized = cachedVendor.workflow.step( + target as unknown as ( + ctx: TaskContext, + ...args: unknown[] + ) => Promise, + ) as unknown as (ctx: TaskContext, ...args: TArgs) => Promise; + } + return memoized(ctx, ...args); + }; + Object.defineProperty(trampoline, "name", { value: stepName }); + return trampoline; +} + +/** + * `TaskContext` whose `emit` is narrowed to a declared event-name → + * payload-shape map. Plugins opt in via the `TEvents` parameter on + * {@link TaskDefinition}; the default keeps the engine's looser + * `(string, any)` shape. + * * @public */ +export interface TypedTaskContext> + extends Omit { + emit( + name: K, + payload: TEvents[K], + ): Promise; +} + +/** + * Reads the OBO `UserContext` AppKit forwards through `ctx.context` on + * `executeTask`. Returns `null` for SP runs, recovery without an + * explicit `{ context }`, or any payload that fails the + * `isUserContext` discriminator (defends against stale / wrong-shape + * sidecars). + * + * @example + * ```ts + * const userCtx = userContextFromTaskCtx(ctx); + * if (userCtx) return runInUserContext(userCtx, () => doWork(input)); + * return doWork(input); + * ``` + * + * @public + */ +export function userContextFromTaskCtx( + ctx: Pick, +): UserContext | null { + const value = ctx.context as unknown; + if ( + value !== null && + typeof value === "object" && + "isUserContext" in value && + (value as { isUserContext?: unknown }).isUserContext === true && + "userId" in value && + typeof (value as { userId?: unknown }).userId === "string" + ) { + return value as UserContext; + } + return null; +} + +/** + * Durable task registration accepted by {@link TaskflowService.task}. + * `TEvents` ties `ctx.emit(name, payload)` to a typed map — those names + * and shapes are exactly the SSE wire frames the client sees. + * + * @public + */ +export interface TaskDefinition< + TInput = unknown, + TResult = unknown, + TEvents extends Record = Record, +> { + name: string; + execute(input: TInput, ctx: TypedTaskContext): Promise; + recover?(input: TInput, ctx: TypedTaskContext): Promise; + autoRecover?: boolean; +} + +/** + * Branded handle returned by {@link TaskflowService.task}. Carries the + * registered name plus phantom type parameters so callers that take + * `TaskHandle | string` can infer input and event shapes without a + * redundant generic. + * + * @public + */ +export interface TaskHandleRef< + TInput = unknown, + TResult = unknown, + TEvents extends Record = Record, +> { + readonly name: string; + /** Phantom marker dragging `TInput`/`TResult`/`TEvents` into the type. Never read at runtime. @internal */ + readonly __taskTypes?: { + input: TInput; + result: TResult; + events: TEvents; + }; +} + +/** Re-exported engine types used in the public `this.taskflow.*` surface. @public */ export type { ResumeOptions, + SseEvent, StopOptions, StreamEvent, SubmitOptions, Task, + TaskContext, TaskEvent, TaskflowConfig, TaskHandle, }; +export type { TaskHandleRef as TaskRef }; + /** - * Single instance per AppKit app, booted by `createApp` and exposed to - * plugins as `this.taskflow`. + * SSE wire helpers for plugins that bridge a subscription by hand + * (e.g. reconnect routes keyed on an existing IK). Same wire format as + * the `executeTask` bridge. + * @public */ +export { setupSseHeaders, writeSseFrame }; + +/** + * Response header carrying the engine-derived idempotency key. Canonical + * declaration in `./execute-task.ts`; re-exported so the public surface + * lives under one import path. + * @public + */ +export { TASKFLOW_IK_HEADER } from "./execute-task"; + +/** + * Settings for `Plugin.executeTask`. Deliberately disjoint from + * `PluginExecutionSettings`: TaskFlow handles retry / dedup / timeout + * natively, so those knobs are typed `never` to fail at compile time. + * + * @example + * ```ts + * await this.executeTask(res, "agent-loop", req.body, { + * cancelOnDisconnect: false, // long-running OBO tasks survive reconnect + * }); + * ``` + * + * @public + */ +export interface ExecuteTaskSettings { + /** + * Issue cooperative `taskflow.stop()` when the SSE client disconnects. + * Default `true`. Set `false` for OBO tasks the user reconnects to + * (engine keeps writing the WAL; reconnects replay via `Last-Event-ID`). + * + * Note: OBO tokens expire in 1 hour. For longer runs, register with + * `autoRecover: false` and resume from a fresh authenticated request. + */ + cancelOnDisconnect?: boolean; + /** + * Grace window before `taskflow.stop()` after client close. Default + * `5000`. Bridges the "wifi blipped for 2 s" reconnect case. Set `0` + * for cancel-immediately. + */ + disconnectGraceMs?: number; + /** + * Idempotency strictness for `engine.submit`. + * + * - `at_least_once` (default): cache-backed dedup, fast path. Two + * pods may both submit within the cache window — fine for + * idempotent reads. + * - `at_most_once`: queries storage before creating the task, so + * cross-pod uniqueness holds. Use for non-idempotent side effects + * (DML, external writes, billing). Costs single-digit ms on submit. + * + * Cross-pod uniqueness requires a shared storage backend + * (`storage.backend: "lakebase"`) — default per-pod SQLite cannot + * coordinate. + */ + executeMode?: "at_least_once" | "at_most_once"; + telemetry?: { + /** Default: true. */ + traces?: boolean; + /** Default: true. */ + metrics?: boolean; + }; + + // Typed `never`: rejected at compile time so a settings object copied + // from `execute()` / `executeStream()` errors clearly. + + /** Forbidden: TaskFlow handles retry via `recover` on `TaskDefinition`. */ + retry?: never; + /** Forbidden: TaskFlow dedupes by idempotency key. */ + cache?: never; + /** Forbidden: TaskFlow uses cooperative `stop()` + `staleThresholdMs`. */ + timeout?: never; + /** Forbidden: wire shape is fixed by what the handler emits. */ + stream?: never; + /** + * Forbidden: identity comes from the active `runInUserContext` scope + * (`asUser(req)` proxy). Accepting a raw `userId` would let callers + * forge ownership for `stop()` / `resume()`. + */ + userId?: never; +} + +/** Captured per-registration so we can surface diagnostics (OBO + autoRecover). @internal */ +interface TaskRegistrationRecord { + autoRecover: boolean; + hasRecover: boolean; +} + +/** + * Active SSE bridge handle. The service keeps a set of these so it can + * drain in-flight bridges (write a final `event: error` frame) before + * the engine closes their iterators on shutdown. + * @internal + */ +export interface ActiveBridge { + /** For log context only — never used for ownership. */ + idempotencyKey: string; + /** Best-effort: write SSE error frame, stop subscribing. Errors are swallowed. */ + drain(reason: string): void; +} + +/** Singleton per AppKit app. Booted by `createApp`, exposed as `this.taskflow`. */ export class TaskflowService { private static _instance: TaskflowService | null = null; private readonly engine: TaskflowEngine; + private readonly registrations: Map = + new Map(); + private readonly activeBridges: Set = new Set(); private hasShutdown = false; private constructor(engine: TaskflowEngine) { this.engine = engine; } - /** - * Bootstraps the service. Pass `false` to opt out (returns `null`). - * Idempotent: subsequent calls return the existing instance. - */ + /** Idempotent. Pass `false` to opt out and return `null`. */ static async initialize( config: TaskflowConfig | false | undefined, ): Promise { @@ -244,7 +510,7 @@ export class TaskflowService { if (!TaskflowService._instance) { throw InitializationError.notInitialized( "TaskflowService", - "Either createApp has not run yet, or it was started with `taskflow: false`. Remove the opt-out to use this.taskflow.", + "createApp not run yet, or started with `taskflow: false`.", ); } return TaskflowService._instance; @@ -255,20 +521,101 @@ export class TaskflowService { return TaskflowService._instance; } - /** - * Test-only singleton reset. Hard-fails in production. Vendor module - * cache is left intact (Node already memoises it). - * @internal - */ + /** Test-only. Hard-fails in production. @internal */ static _resetForTests(): void { if (process.env.NODE_ENV === "production") { throw new Error( - "TaskflowService._resetForTests() is test-only and refuses to run when NODE_ENV=production.", + "TaskflowService._resetForTests() refuses to run when NODE_ENV=production.", ); } TaskflowService._instance = null; } + /** + * Registers a durable task. Call from the plugin's `setup()` hook + * with handlers bound to the plugin instance. `TEvents` constrains + * `ctx.emit(name, payload)`, which is also the SSE wire shape. + * + * @example + * ```ts + * type AgentEvents = { turn_done: { turn: number; result: string } }; + * + * this.taskflow.task({ + * name: "agent-loop", + * execute: async (input, ctx) => { + * await ctx.emit("turn_done", { turn: 1, result: "ok" }); // ✓ + * await ctx.emit("typo", {}); // ✗ compile error + * }, + * autoRecover: false, + * }); + * ``` + */ + task< + TInput = unknown, + TResult = unknown, + TEvents extends Record = Record, + >( + definition: TaskDefinition, + ): TaskHandleRef { + this.assertAlive(); + if (this.registrations.has(definition.name)) { + // Throw in prod so a duplicate doesn't silently shadow the first + // handler (recovery worker would route in-flight tasks to a stale + // closure). Warn-only in dev so HMR loops keep working. + const message = `Task "${definition.name}" is already registered.`; + if (process.env.NODE_ENV === "production") { + throw new Error(message); + } + logger.warn(`${message} (allowed in non-production)`); + } + // TODO(taskflow#engine-binding-reconciliation): vendored + // `registerTask` is positional at runtime even though the .d.ts + // documents the object form. `TypedTaskContext` is compile-time + // only — at runtime emit is unconstrained `(string, any)`. + const recoverFn = definition.recover ?? null; + const registerOpts = + definition.autoRecover === undefined + ? null + : { autoRecover: definition.autoRecover }; + ( + this.engine as unknown as { + registerTask( + n: string, + exec: (input: unknown, ctx: TaskContext) => Promise, + recover: + | ((input: unknown, ctx: TaskContext) => Promise) + | null, + opts: { autoRecover?: boolean } | null, + ): void; + } + ).registerTask( + definition.name, + definition.execute as unknown as ( + input: unknown, + ctx: TaskContext, + ) => Promise, + recoverFn as unknown as + | ((input: unknown, ctx: TaskContext) => Promise) + | null, + registerOpts, + ); + this.registrations.set(definition.name, { + autoRecover: definition.autoRecover ?? true, + hasRecover: typeof definition.recover === "function", + }); + return { name: definition.name } as TaskHandleRef; + } + + /** Process-local lookup; not cross-pod. */ + hasTask(name: string): boolean { + return this.registrations.has(name); + } + + /** Used by `executeTask` to surface OBO misconfigurations. @internal */ + getRegistration(name: string): TaskRegistrationRecord | undefined { + return this.registrations.get(name); + } + /** Bootstraps the service. Returns `null` when opted out. */ static async boot( config: TaskflowConfig | false | undefined, @@ -278,11 +625,7 @@ export class TaskflowService { return { instance: service, stop: () => service.shutdown() }; } - /** - * Spawns a new task attempt. Returns a handle even when a task with - * the same idempotency key already exists — dedup is resolved by the - * engine based on `executeMode`. - */ + /** Returns a handle even on dedup hits — engine resolves via `executeMode`. */ async start( name: string, input: unknown, @@ -292,10 +635,7 @@ export class TaskflowService { return this.engine.submit(name, input, options); } - /** - * Async iterable of `StreamEvent`s ordered by sequence number. Pass - * `lastSeq` to resume from a known position (SSE reconnection). - */ + /** Pass `lastSeq` to resume from a known WAL position (SSE reconnect). */ subscribe( idempotencyKey: string, lastSeq?: number, @@ -304,7 +644,7 @@ export class TaskflowService { return this.engine.subscribe(idempotencyKey, lastSeq); } - /** Returns the current task record, or null if not found / unauthorized. */ + /** Returns null if not found or unauthorized. */ async reconnect( idempotencyKey: string, userId?: string, @@ -313,10 +653,7 @@ export class TaskflowService { return this.engine.reconnect(idempotencyKey, userId); } - /** - * Revives a suspended task — after a deliberate `stop()`, or after a - * crash for an OBO task (where auto-recovery is disabled). - */ + /** Revives a suspended task (after `stop()` or crash with `autoRecover: false`). */ async resume( idempotencyKey: string, options: ResumeOptions = {}, @@ -325,7 +662,7 @@ export class TaskflowService { return this.engine.resume(idempotencyKey, options); } - /** Cooperative stop. Emits a `suspended` event. Idempotent. */ + /** Cooperative. Emits `suspended`. Idempotent. */ async stop( idempotencyKey: string, options: StopOptions = {}, @@ -334,21 +671,55 @@ export class TaskflowService { return this.engine.stop(idempotencyKey, options); } - /** - * Test-only: aborts the executor mid-run without writing a terminal - * event so reconnect/recovery exercises the crash path. Throws unless - * `enableTestMode: true`. - */ + /** Test-only crash injection. Throws unless `enableTestMode: true`. */ simulateCrash(idempotencyKey: string): void { this.assertAlive(); this.engine.simulateCrash(idempotencyKey); } - /** Drains in-flight tasks and shuts the engine down. Idempotent. */ + /** + * Registers an SSE bridge so shutdown can drain it with a graceful + * `error: server_shutting_down` frame before the engine closes the + * subscription. Returns an unregister callback. @internal + */ + _registerBridge(bridge: ActiveBridge): () => void { + if (this.hasShutdown) { + try { + bridge.drain("server_shutting_down"); + } catch (err) { + logger.debug("Bridge drain after shutdown threw: %O", err); + } + return () => {}; + } + this.activeBridges.add(bridge); + return () => { + this.activeBridges.delete(bridge); + }; + } + + /** Idempotent. */ async shutdown(): Promise { if (this.hasShutdown) return; this.hasShutdown = true; logger.info("Shutting down TaskFlow engine"); + // Drain bridges before the engine — otherwise iterators close and + // clients see a silent EOF instead of an actionable error frame. + if (this.activeBridges.size > 0) { + logger.debug( + `Draining ${this.activeBridges.size} active TaskFlow SSE bridge(s) before engine shutdown`, + ); + for (const bridge of this.activeBridges) { + try { + bridge.drain("server_shutting_down"); + } catch (err) { + logger.debug( + `Bridge drain failed for IK ${bridge.idempotencyKey}: %O`, + err, + ); + } + } + this.activeBridges.clear(); + } await this.engine.shutdown(); if (TaskflowService._instance === this) { TaskflowService._instance = null; diff --git a/packages/appkit/src/taskflow/sse.ts b/packages/appkit/src/taskflow/sse.ts new file mode 100644 index 000000000..f3c75df4a --- /dev/null +++ b/packages/appkit/src/taskflow/sse.ts @@ -0,0 +1,127 @@ +/** + * SSE wire format for the TaskFlow bridge. Uses the engine `streamSeq` + * as the SSE `id:` so client `Last-Event-ID` reconnects resume against + * the WAL. + */ +import type { Response } from "express"; + +/** + * Event names the bridge writes itself, or that collide with engine + * terminal frames. Plugin code must NOT emit these via `ctx.emit` — + * `event: completed` would close the EventSource on the client. + * @internal + */ +export const RESERVED_BRIDGE_EVENT_NAMES = new Set([ + // Bridge-internal frames. + "ready", + "error", + // Engine wire vocabulary. + "heartbeat", + "completed", + "failed", + "cancelled", + "suspended", +]); + +/** + * SSE frame. `data` is pre-serialised so callers can emit JSON, plain + * text, or anything else without the writer touching the payload. + * @public + */ +export interface SseEvent { + /** SSE `event:` field name (e.g. "tick", "completed", "data"). */ + event: string; + /** Pre-serialised payload for the SSE `data:` field. */ + data: string; + /** + * Optional SSE `id:` echoed back via `Last-Event-ID` on reconnect. + * The bridge sets it to the engine `streamSeq`. + */ + id?: string | number; +} + +/** + * SSE injection guard: event names live on a single line, so embedded + * CR/LF would let an attacker close one event and start another. Refuse + * rather than strip — caller has a bug to fix. + * @internal + */ +function assertSafeEventName(name: string): void { + if (/[\r\n]/.test(name)) { + throw new Error( + `SSE event name must not contain CR/LF: ${JSON.stringify(name)}`, + ); + } +} + +/** Same rationale as {@link assertSafeEventName}. @internal */ +function assertSafeId(id: string | number): void { + if (typeof id === "number") return; + if (/[\r\n]/.test(id)) { + throw new Error(`SSE id must not contain CR/LF: ${JSON.stringify(id)}`); + } +} + +/** + * Re-encode `data` as one or more `data:` lines per the SSE spec. + * EventSource joins them with `\n`, so multi-line payloads round-trip + * losslessly while every wire line still starts with `data:` (denies + * the "embedded CRLF dispatches a synthetic event" attack). + * @internal + */ +function formatDataField(data: string): string { + return data + .replace(/\r\n?/g, "\n") + .split("\n") + .map((line) => `data: ${line}`) + .join("\n"); +} + +/** + * Writes the SSE handshake (200 + content type + cache controls). + * `flushHeaders()` so intermediaries see the response start before the + * first event lands. `Vary: Origin` so a CDN that caches an SSE + * response (despite `no-cache`) cannot serve it cross-origin. + * @public + */ +export function setupSseHeaders(res: Response): void { + if (res.headersSent) return; + res.statusCode = 200; + res.setHeader("Content-Type", "text/event-stream; charset=utf-8"); + res.setHeader("Cache-Control", "no-cache, no-transform"); + // Disables nginx-style proxy buffering used by Cloudflare / AWS / GCP. + res.setHeader("X-Accel-Buffering", "no"); + res.setHeader("Vary", "Origin"); + res.flushHeaders?.(); +} + +/** + * Writes one SSE frame. Returns `false` once the response has ended so + * callers can break out of the subscribe loop. Throws on CR/LF in + * `event` / `id` (header-injection style). + * @public + */ +export function writeSseFrame(res: Response, frame: SseEvent): boolean { + if (res.writableEnded) return false; + assertSafeEventName(frame.event); + if (frame.id !== undefined && frame.id !== null) { + assertSafeId(frame.id); + res.write(`id: ${frame.id}\n`); + } + res.write(`event: ${frame.event}\n`); + res.write(`${formatDataField(frame.data)}\n\n`); + return true; +} + +/** + * Writes an SSE comment (`: \n\n`). Ignored by EventSource but + * keeps the connection alive through proxies that drop idle sockets. + * Used by the bridge to translate engine `heartbeat` into wire + * keep-alives without surfacing them as application events. + * @internal + */ +export function writeSseComment(res: Response, text: string): boolean { + if (res.writableEnded) return false; + res.write(`: ${text}\n\n`); + return true; +} diff --git a/tools/test-helpers.ts b/tools/test-helpers.ts index 9967df7da..30252e26a 100644 --- a/tools/test-helpers.ts +++ b/tools/test-helpers.ts @@ -1,3 +1,4 @@ +import { createHash } from "node:crypto"; import type { Span, SpanOptions } from "@opentelemetry/api"; import type { IAppRouter } from "shared"; import { vi } from "vitest"; @@ -314,32 +315,50 @@ export async function runWithRequestContext( } /** - * Parses SSE response. Format: "event: result\ndata: {...}\n\n" + * Parses an SSE response body and returns one frame's data merged with + * `{ eventType }`. Handles multi-line `data:` payloads, CRLF, and SSE + * comments. Pass `eventType` to pick a specific frame; without it, the + * last frame wins. */ -export async function parseSSEResponse(response: Response): Promise { - const text = await response.text(); - const lines = text.split("\n"); - - let eventType: string | null = null; - let dataLine: string | null = null; - - for (const line of lines) { - if (line.startsWith("event: ")) { - eventType = line.substring(7).trim(); - } else if (line.startsWith("data: ")) { - dataLine = line.substring(6); +export async function parseSSEResponse( + response: Response, + options: { eventType?: string } = {}, +): Promise { + const text = (await response.text()).replace(/\r\n?/g, "\n"); + const target = options.eventType; + + let chosenEventType: string | null = null; + let chosenDataLines: string[] | null = null; + let lastEventType: string | null = null; + let lastDataLines: string[] | null = null; + + for (const frame of text.split("\n\n")) { + let eventType: string | null = null; + const dataLines: string[] = []; + for (const line of frame.split("\n")) { + if (line.startsWith(":")) continue; + if (line.startsWith("event: ")) eventType = line.substring(7).trim(); + else if (line.startsWith("data: ")) dataLines.push(line.substring(6)); + } + if (dataLines.length === 0) continue; + lastEventType = eventType; + lastDataLines = dataLines; + if (target && eventType === target) { + chosenEventType = eventType; + chosenDataLines = dataLines; } } - if (!dataLine) { - throw new Error(`No data found in SSE response: ${text}`); + const eventType = target ? chosenEventType : lastEventType; + const dataLines = target ? chosenDataLines : lastDataLines; + + if (!dataLines || dataLines.length === 0) { + throw new Error( + `No ${target ? `${target} ` : ""}data found in SSE response: ${text}`, + ); } - const parsed = JSON.parse(dataLine); - return { - eventType, - ...parsed, - }; + return { eventType, ...JSON.parse(dataLines.join("\n")) }; } export function createConfigurableMockWorkspaceClient() { @@ -394,3 +413,212 @@ export function createFailedSQLResponse(errorMessage: string) { statement_id: `stmt-${Date.now()}`, }; } + +/** + * In-process stand-in for `TaskflowService` that runs the handler + * directly inside `subscribe()` instead of through the vendored Rust + * engine. Use for unit tests where the real engine (SQLite WAL, + * recovery worker, FFI sidecar) is overkill. + * + * - `start()` keys runs by an engine-shaped IK + * (`sha256(name || canon(input) || userId)`). + * - `subscribe()` yields every `ctx.emit(name, payload)` as + * `custom:` then a single `completed` / `failed` terminal. + * - `_emitHeartbeat` / `_emitStepCheckpoint` exercise the bridge + * filters without booting the engine. + * + * Skips real recovery, storage dedup, and IK-cache eviction — for + * those, run against `createApp(...)`. + */ +export function createStubTaskflowService() { + type TaskDef = { + name: string; + execute: (input: unknown, ctx: any) => Promise; + recover?: (input: unknown, ctx: any) => Promise; + autoRecover?: boolean; + }; + + const tasks = new Map(); + const stashedRuns = new Map< + string, + { + name: string; + input: unknown; + opts: { userId?: string; context?: unknown }; + } + >(); + /** Pre-injected events keyed by IK; yielded ahead of handler events. */ + const injectedEvents = new Map< + string, + Array<{ event: any; streamSeq: number }> + >(); + let seq = 0; + + // Mirrors the engine IK shape (sha256 hex; the engine emits base64 + // but tests only need stable equality, not byte-for-byte parity). + const canonicalize = (value: unknown): string => { + if (value === null || value === undefined) return "null"; + if (typeof value !== "object") return JSON.stringify(value); + if (Array.isArray(value)) { + return `[${value.map(canonicalize).join(",")}]`; + } + const keys = Object.keys(value as Record).sort(); + const parts = keys.map( + (k) => + `${JSON.stringify(k)}:${canonicalize((value as Record)[k])}`, + ); + return `{${parts.join(",")}}`; + }; + const computeIK = (name: string, input: unknown, userId?: string) => { + const payload = `${name}|${canonicalize(input)}|${userId ?? ""}`; + return createHash("sha256").update(payload).digest("hex"); + }; + + const stub = { + task: vi.fn((def: TaskDef) => { + tasks.set(def.name, def); + }), + + start: vi.fn( + async ( + name: string, + input: unknown, + opts: { userId?: string; context?: unknown } = {}, + ) => { + const ik = computeIK(name, input, opts.userId); + stashedRuns.set(ik, { name, input, opts }); + return { taskId: ik, idempotencyKey: ik }; + }, + ), + + subscribe: vi.fn((ik: string) => { + const run = stashedRuns.get(ik); + const def = run ? tasks.get(run.name) : undefined; + + return (async function* () { + if (!run || !def) return; + + // Pre-injected events first (exercises bridge filters). + const pre = injectedEvents.get(ik); + if (pre) { + for (const e of pre) yield e; + } + + const events: Array<{ event: any; streamSeq: number }> = []; + const ctx = { + taskId: ik, + idempotencyKey: ik, + userId: run.opts.userId ?? null, + attempt: 1, + previousEvents: [], + isRecovery: false, + context: run.opts.context ?? null, + emit: async (eventType: string, payload?: unknown) => { + events.push({ + event: { + id: "", + taskId: ik, + idempotencyKey: ik, + seq: ++seq, + eventType: `custom:${eventType}`, + timestampMs: Date.now(), + payload, + }, + streamSeq: seq, + }); + }, + heartbeat: async () => {}, + }; + + try { + const result = await def.execute(run.input, ctx); + events.push({ + event: { + id: "", + taskId: ik, + idempotencyKey: ik, + seq: ++seq, + eventType: "completed", + timestampMs: Date.now(), + payload: { result }, + }, + streamSeq: seq, + }); + } catch (err) { + events.push({ + event: { + id: "", + taskId: ik, + idempotencyKey: ik, + seq: ++seq, + eventType: "failed", + timestampMs: Date.now(), + payload: { + error: err instanceof Error ? err.message : String(err), + }, + }, + streamSeq: seq, + }); + } + + for (const e of events) yield e; + })(); + }), + + stop: vi.fn(async (ik: string) => ({ taskId: ik, idempotencyKey: ik })), + resume: vi.fn(async () => null), + reconnect: vi.fn(async () => null), + simulateCrash: vi.fn(), + + hasTask: vi.fn((name: string) => tasks.has(name)), + getRegistration: vi.fn((name: string) => { + const t = tasks.get(name); + return t + ? { autoRecover: t.autoRecover ?? true, hasRecover: !!t.recover } + : undefined; + }), + + // No-op: tests don't simulate shutdown drainage. + _registerBridge: vi.fn(() => () => {}), + + /** Queue a `heartbeat` engine event ahead of handler events. */ + _emitHeartbeat: (ik: string) => { + const list = injectedEvents.get(ik) ?? []; + list.push({ + event: { + id: "", + taskId: ik, + idempotencyKey: ik, + seq: ++seq, + eventType: "heartbeat", + timestampMs: Date.now(), + payload: null, + }, + streamSeq: seq, + }); + injectedEvents.set(ik, list); + }, + + /** Queue a `custom:step:*` checkpoint event (WAL-only, dropped on the wire). */ + _emitStepCheckpoint: (ik: string, name: string, value?: unknown) => { + const list = injectedEvents.get(ik) ?? []; + list.push({ + event: { + id: "", + taskId: ik, + idempotencyKey: ik, + seq: ++seq, + eventType: `custom:step:${name}`, + timestampMs: Date.now(), + payload: value, + }, + streamSeq: seq, + }); + injectedEvents.set(ik, list); + }, + + shutdown: vi.fn(async () => {}), + }; + + return stub; +}