diff --git a/.claude/references/tasks.md b/.claude/references/tasks.md
new file mode 100644
index 000000000..a9d2dfe11
--- /dev/null
+++ b/.claude/references/tasks.md
@@ -0,0 +1,404 @@
+# Durable Task Best Practices
+
+Reference guide for using the durable task service inside AppKit plugins. A built-in **durable execution service** — every plugin gets `this.task` (unless `createApp({ task: false })`) and `this.executeTask` on `Plugin`.
+
+Every guideline is prefixed with a severity tier:
+
+- **NEVER** — Security, correctness, or breakage blocker. Violating this corrupts state, double-charges users, or loses durability guarantees.
+- **MUST** — Correctness requirement. Violating this produces bugs, broken recovery, or inconsistent behavior.
+- **SHOULD** — Quality recommendation. Violating this degrades DX, performance, or maintainability.
+
+> **Scope:** the task service used from inside core or custom AppKit plugins (`packages/appkit/src/plugins/` or external). For the underlying engine architecture, see the upstream engine repo's `docs/ARCHITECTURE.md` and `docs/ONE_PAGER.md`.
+
+**Imports.** AppKit surfaces task types and helpers from `@databricks/appkit` (for example `step`, `TaskDefinition`, `TypedTaskContext`, `TaskContext`, `TASK_IDEMPOTENCY_HEADER`, `setupSseHeaders`, `writeSseFrame`). The vendored native engine lives under `packages/appkit/vendor/taskflow/`; do not import the vendored `Taskflow` facade from that path inside plugins — use `this.task` instead.
+
+---
+
+## 1. When to Use Durable Tasks
+
+The decision is binary. Reach for it when **at least one** of these is true:
+
+- Operation runs longer than ~5 seconds and the user/client expects to see progress.
+- Re-running from scratch on crash is **incorrect** (side effects already persisted) or **expensive** (compute, money, time).
+- The operation must survive process restart (rolling deploy, crash, host reclaim).
+- Multiple steps each have their own success/failure semantics that need to be tracked.
+
+**MUST** use `this.executeTask(res, name, input, settings?)` — not `this.execute()` — for any operation matching the above. Do not try to reinvent durability with the `retry` interceptor plus ad-hoc disk writes.
+
+**MUST NOT** use it for sub-second reads, stateless transformations, or fire-and-forget logging. The WAL append cost is small but the conceptual overhead (task naming, idempotency keys, recovery) is not. Use `this.execute()` with retry/cache/timeout interceptors instead.
+
+| Method | Latency budget | Crash survival | Use when |
+|---|---|---|---|
+| `this.execute(fn, settings)` | < 5s typical | None | Read with retry/cache/timeout |
+| `this.executeStream(res, gen, settings)` | seconds to minutes | None | Live progress; loss-on-crash acceptable |
+| `this.executeTask(res, name, input, settings?)` | seconds to hours | Full | Anything you would be sad to re-run from scratch |
+
+---
+
+## 2. Task Registration
+
+**MUST** register every durable task in `setup()`, never in `injectRoutes()` or constructors. `setup()` runs once after plugins boot; route handlers run per request.
+
+**MUST** pass a single **`TaskDefinition`** object to `this.task.task(definition)`. The shipped shape is `{ name, execute, recover?, autoRecover? }`, not separate positional arguments.
+
+```typescript
+import type { TypedTaskContext } from "@databricks/appkit";
+
+async setup() {
+ this.task.task({
+ name: "agent-loop",
+ execute: (input, ctx) => this.runAgentLoop(input, ctx),
+ recover: (input, ctx) => this.recoverAgentLoop(input, ctx),
+ autoRecover: true,
+ });
+ this.task.task({
+ name: "export-data",
+ execute: (input, ctx) => this.exportData(input, ctx),
+ autoRecover: true,
+ });
+}
+```
+
+**MUST** use lowercase kebab-case (or a consistent colon-separated prefix convention such as `my-plugin:query`) for the `name` field. The string you pass is the task name the engine stores; AppKit does not auto-prefix it. Multiple plugins should choose names that do not collide.
+
+**NEVER** register the same `name` twice expecting two handlers. The service **warns** and the new registration **replaces** the previous one; recovery and routing become confusing.
+
+**NEVER** change a task's `name` across releases without a migration plan. Names are persisted in the WAL — renaming orphans in-flight tasks across deploy.
+
+**MUST** understand `autoRecover`: the service records `definition.autoRecover ?? true`. So **`true` is the default for all tasks**, including OBO tasks. There is no special default that flips off for OBO. For OBO workloads, you **MUST** set `autoRecover: false` yourself: the recovery worker has no `UserContext`, so automatic recovery will break `asUser`-style calls after restart.
+
+**MUST** treat the runtime warning in `executeTask` honestly: if you register an OBO task with `autoRecover: true` (explicitly or by default), AppKit logs a **once-per-(plugin,task)** warning that recovery after restart will run without the original `UserContext`. That is guidance, not a change to the default — **fix the registration** (`autoRecover: false` + explicit `resume`) rather than relying on the warning.
+
+**SHOULD** provide `recover` when re-running `execute` from scratch would be expensive or unsafe. Without `recover`, the engine may still re-invoke the handler on recovery per engine semantics; `recover` is your typed hook when you need custom resumption logic.
+
+**SHOULD** use `this.task.task({ ... })` with a third generic (`Events`) so `ctx.emit` is tied to a known event map — the same names and payload shapes are what the SSE bridge exposes to clients.
+
+---
+
+## 3. Idempotency Keys
+
+The engine derives each task's idempotency key (IK) from the **task name**, a **canonical form of the input**, and the **submit-time `userId`** (from the active user context when OBO, or absent for service-principal runs). Plugin code does **not** pass an IK override through `executeTask` — `ExecuteTaskSettings` has no `idempotencyKey` field.
+
+**MUST** treat the IK as the **identity of the logical task**. Duplicate submits with the same name + input + owner dedupe per the engine's `executeMode` (`SubmitOptions`).
+
+**MUST** design `input` so that everything that should distinguish two logical runs is **on the input object**. Example: the analytics plugin includes `queryKey`, statement, parameters, executor key, and format discriminator so distinct client operations do not collide.
+
+**SHOULD** return the IK to clients when they need reconnect or `/resume` / `/stop` follow-ups. The `executeTask` bridge sets the HTTP header **`TASK_IDEMPOTENCY_HEADER`** (`"X-AppKit-Task-Idempotency-Key"`) and sends an initial SSE frame `event: ready` with `data: {"idempotencyKey":...}` so cross-origin `EventSource` clients that cannot read headers still get the key. Import `TASK_IDEMPOTENCY_HEADER` from `@databricks/appkit` if you read it in middleware or tests.
+
+**NEVER** assume you can forge ownership by passing a user id from the client. `ExecuteTaskSettings` explicitly forbids a `userId` field (`never`) — identity comes only from `runInUserContext` / `asUser(req)`.
+
+**NEVER** fold attempt counters, retry numbers, or timestamps into inputs when they would change what should be the **same** logical task — that creates accidental IK churn. Conversely, do not omit fields from input that should separate two runs.
+
+**SHOULD** switch the storage backend for real multi-pod deployment: if the runtime looks like **Databricks Apps** (`DATABRICKS_APP_NAME`, `DATABRICKS_APP_ID`, or `DATABRICKS_APP_URL`) and you still use the default **SQLite** backend, AppKit logs a **WARN** that tasks will not survive rolling restarts; plan for **`lakebase`** (Postgres) or another shared backend via `createApp({ task: { storage: … } })`.
+
+---
+
+## 4. Handler Signature
+
+The handler signature is fixed on `TaskDefinition.execute`:
+
+```typescript
+execute(input: TInput, ctx: TypedTaskContext): Promise
+```
+
+**`ctx` provides** (engine `TaskContext`, narrowed for `emit` when you use `TypedTaskContext`):
+
+| Field | Type | Use |
+|---|---|---|
+| `ctx.emit(name, payload)` | `(string, unknown) => Promise` | Append a user event; becomes SSE after bridge rules below |
+| `ctx.isRecovery` | `boolean` | `true` if this invocation is a recovery path |
+| `ctx.previousEvents` | `TaskEvent[]` | Historical events for this task; inspect in recovery |
+| `ctx.context` | `unknown \| null` | Live sidecar (e.g. `UserContext`); **not** persisted across crash |
+| `ctx.idempotencyKey` | `string` | IK for this task |
+| `ctx.attempt` | `number` | Attempt counter |
+| `ctx.taskId`, `ctx.userId`, `ctx.heartbeat()` | … | See engine types in `task.d.ts` |
+
+**MUST** treat the handler as a function that may run in another process after failure. Durable state must come from `input`, `ctx.previousEvents`, or external storage — not from closures that assume a single long-lived Node process.
+
+**MUST** branch on `ctx.isRecovery` when semantics differ after restart.
+
+**MUST** await every `ctx.emit()` call.
+
+**MUST** when reading `ctx.previousEvents`, compare against engine **`eventType` strings as stored**, not the short names you passed to `emit`. User emits are persisted with a **`custom:`** prefix — e.g. `ctx.emit("tick", …)` produces `eventType: "custom:tick"`. The **`executeTask` SSE bridge strips `custom:` for frames sent to the client**, but **`previousEvents` in the handler still use the prefixed form.**
+
+**NEVER** emit custom events whose **short name** (after `custom:` is stripped for wire purposes) matches any reserved bridge or terminal name. If you `ctx.emit` one of these, the bridge **logs a warning and drops** the frame so clients do not get misleading terminal or control events:
+
+- `ready`, `error`, `heartbeat`, `completed`, `failed`, `cancelled`, `suspended`
+
+Choose alternatives such as `task_completed` or `query_tick`.
+
+**SHOULD** keep payloads JSON-friendly. The bridge serialises with `JSON.stringify` and a **BigInt** replacer (values become **decimal strings** on the wire) so warehouse `LONG`/`BIGINT` columns do not throw at serialisation time.
+
+---
+
+## 5. The `step()` Helper (Not a Decorator)
+
+AppKit exports **`step`** as a **higher-order function** from `@databricks/appkit`. There is **no** `@step` decorator on `Plugin`.
+
+```typescript
+import { step } from "@databricks/appkit";
+
+const fetchInvoices = step(async (ctx, accountId: string) => {
+ return this.invoiceClient.list({ accountId });
+});
+
+// Inside this.task.task({ execute: async (input, ctx) => { ... } })
+const rows = await fetchInvoices(ctx, input.accountId);
+```
+
+**Semantics:**
+
+- **`step(fn)` returns a memoised wrapper.** The binding to the native engine is **lazy**: it resolves on **first invocation** inside a running task, after the task service has initialised. Calling the wrapper **before** `createApp` has booted the task service throws `InitializationError`.
+- The engine keys checkpoints using the wrapped function's **`Function.name`**. **Anonymous arrow functions all share an empty name and can collide.** **MUST** use **named function expressions** or **named async functions** for anything you wrap with `step`.
+- On recovery, completed steps short-circuit to cached results — use for expensive or non-idempotent segments.
+
+**MUST** ensure step bodies are safe to skip on replay when already recorded (idempotent reads, or deduped writes).
+
+**NEVER** call `this.task.start()` from inside a `step` body or another task handler in a way that creates unbounded nested task fan-out. Orchestrate with separate tasks or explicit emits.
+
+**SHOULD** keep `step` units small and name them after their effect. Engine step events use the `custom:step:…` pattern on the WAL; the **`executeTask` bridge drops `custom:step:*` events** (WAL-only checkpoints, not client SSE).
+
+---
+
+## 6. `executeTask`: The Common Pattern
+
+`this.executeTask(res, taskName, input, settings?)` bridges **`this.task.start` → SSE subscribe loop** for the POST-and-stream pattern.
+
+```typescript
+injectRoutes(router: IAppRouter) {
+ this.route(router, {
+ name: "run",
+ method: "post",
+ path: "/run",
+ handler: async (req, res) => {
+ await this.executeTask(res, "agent-loop", req.body, {
+ cancelOnDisconnect: true,
+ disconnectGraceMs: 5000,
+ telemetry: { traces: true, metrics: true },
+ });
+ },
+ });
+}
+```
+
+**`ExecuteTaskSettings`** (only fields that exist):
+
+- `cancelOnDisconnect?` — default `true`; when `true`, after the client closes the TCP connection the bridge waits **`disconnectGraceMs`** (default **5000** ms) then calls `this.task.stop` with reason `client_disconnected`. Set `cancelOnDisconnect: false` for long OBO runs where you expect `EventSource` reconnects. **Note:** OBO tokens still expire (~1 hour); multi-hour OBO work should use `autoRecover: false` and **`resume` from a fresh auth** before token expiry.
+- `disconnectGraceMs?` — non-negative milliseconds; ignored when `cancelOnDisconnect` is `false`.
+- `telemetry?` — `{ traces?, metrics? }` (defaults `true`).
+
+**Compile-time `never` guards** (do not pass): `retry`, `cache`, `timeout`, `stream`, **`userId`**. the task service replaces retry/cache/timeout; the bridge wire is fixed (no `stream.eventFilter`); identity must never be taken from request bodies.
+
+**Identity / OBO.** The bridge passes `userId` and `context` from **`getCurrentUserContext()`** into `this.task.start`. For OBO, call through the proxy: **`await this.asUser(req).executeTask(res, …)`**. For service principal, call `this.executeTask` without entering user context.
+
+**Wire behaviour (in order):**
+
+1. `this.task.start(taskName, input, { userId, context })`.
+2. If headers are not yet sent: set **`X-AppKit-Task-Idempotency-Key`**, call `setupSseHeaders(res)`, write **`event: ready`** with the IK in JSON.
+3. `this.task.subscribe(idempotencyKey, lastSeq)` where `lastSeq` comes from **`Last-Event-ID`** on the request.
+4. For each stream event: **heartbeats** become SSE **comments** (`: hb`), **`custom:step:*`** are skipped, reserved custom names are dropped with a warning, other **`custom:`** events are stripped to the short name for `event:`, **`id:`** is set to `streamSeq` for replay, payloads are JSON with BigInt-safe stringification.
+5. On engine terminal types **`completed`**, **`failed`**, **`cancelled`**, the bridge ends the response after forwarding that frame.
+6. On AppKit shutdown, active bridges receive **`event: error`** with **`data: {"message":"server_shutting_down"}`** (best effort) before the iterator closes.
+
+**MUST NOT** pass `retry`, `cache`, `timeout` (interceptor sense), `stream`, or `userId` in settings — they are rejected by types.
+
+**MUST NOT** call `this.executeTask` outside a request handler without an Express `Response` you own. From `setup`, cron, or workers, use **`this.requireTask().start`** / **`subscribe`** directly (and set `SubmitOptions.context` yourself if OBO).
+
+**SHOULD** control what the client sees **only** via `ctx.emit` names and payloads (optionally typed with `TaskDefinition` generics).
+
+---
+
+## 7. OBO (`asUser`) with the task service
+
+**MUST** use `await this.asUser(req).executeTask(...)` when the task body calls plugin code that depends on **`runInUserContext`** (warehouse OBO, etc.). The bridge forwards the live **`UserContext`** as `ctx.context` — it is **never** stored in SQLite; it exists only for the current attempt.
+
+**NEVER** pass a fake `userId` through settings (the field does not exist) or trust client-supplied identity for resume/stop.
+
+**MUST** for OBO tasks that must survive process restart **register with `autoRecover: false`** and design **`this.task.resume(idempotencyKey, { userId, context })`** from a **fresh authenticated request** that re-establishes **`UserContext`**:
+
+```typescript
+// setup — OBO-capable task; no automatic recovery without context
+this.task.task({
+ name: `${this.name}:query`,
+ execute: (input, ctx) => this._runQuery(input, ctx),
+ autoRecover: false,
+});
+
+// Fresh authenticated route: resume must pass the same userId as submit time, and for OBO
+// a live context object — mirror Plugin.asUser(req) (ServiceContext + token headers).
+// See packages/appkit/src/plugins/analytics/analytics.ts (`_runQueryTask`) for the throw-if-missing contract.
+await this.requireTask().resume(ikFromClient, {
+ userId: this.resolveUserId(req),
+ // OBO: add `context` — the live `UserContext` for this request (what `executeTask` forwards).
+});
+```
+
+The **analytics plugin** is the reference: `_runQueryTask` uses **`autoRecover: false`** and **throws** if `input.isAsUser` is true but `ctx.context` is missing (resume/recovery without `context: req` would otherwise risk running as the wrong principal).
+
+**MUST** pass **`context`** compatible with your handler when resuming OBO work — typically by routing resume through the same `asUser(req)` machinery so `executeTask` or your resume callsite captures `ServiceContext.createUserContext(...)`.
+
+**SHOULD** scope any **application-level cache** keys with `getCurrentUserId()` for OBO and a stable SP marker for service tasks.
+
+---
+
+## 8. Recovery Patterns (Cookbook)
+
+Use the **`custom:`** prefix when scanning `ctx.previousEvents`.
+
+### 8a. Agentic Loop
+
+Persist conversation state in your DB; use events for streaming only.
+
+```typescript
+this.task.task({
+ name: "chat",
+ execute: async (input, ctx) => {
+ if (ctx.isRecovery) {
+ await ctx.emit("recovered", { turns: await this.countDbTurns(input.sessionId) });
+ }
+ // …
+ },
+ autoRecover: true,
+});
+```
+
+### 8b. Staged Pipeline
+
+```typescript
+async pipeline(input: PipelineInput, ctx: TaskContext) {
+ const completed = new Set(
+ ctx.previousEvents
+ .filter(e => e.eventType === "custom:stage_done")
+ .map(e => (e.payload as { stage: string }).stage),
+ );
+ if (!completed.has("extract")) {
+ await this.extract(input);
+ await ctx.emit("stage_done", { stage: "extract" });
+ }
+ // …
+}
+```
+
+### 8c. Saga (Forward + Compensate)
+
+```typescript
+async saga(input: SagaInput, ctx: TaskContext) {
+ const completed = ctx.previousEvents
+ .filter(e => e.eventType === "custom:step_done")
+ .map(e => e.payload as { name: string; context: unknown });
+ const failed = ctx.previousEvents.find(e => e.eventType === "custom:step_failed");
+ // …
+}
+```
+
+---
+
+## 9. Errors, Suspension, and Shutdown
+
+**MUST** handle failures from **`this.task.stop`**, **`resume`**, and **`start`** explicitly in production code (`try/catch`, typed error inspection). The vendored `Engine` throws on some invalid transitions; there is no stable public `isTaskPauseError` helper exported from `@databricks/appkit` today — branch on message / name cautiously or map errors in your layer.
+
+**MUST** rely on AppKit graceful shutdown: **`TaskManager.shutdown()`** drains active SSE bridges ( **`server_shutting_down`** ) then shuts down the native engine. Plugins **MUST NOT** call low-level shutdown from random hooks unless you know you are replacing AppKit lifecycle.
+
+**SHOULD** call **`this.streamManager.abortAll()`** in **`shutdown()`** if your plugin uses **`executeStream`** alongside the task service SSE — `executeTask` subscriptions are managed by the the task service bridge registry.
+
+---
+
+## 10. NEVER Rules (Compile-Free Footguns)
+
+**NEVER** call **`this.task.start()`** from inside another task in a way that creates unbounded nested task explosions without a clear orchestration model.
+
+**NEVER** mutate logical **`input`** fields between attempts for recovery incorrectly — the engine replays stored input. If you need derived state, recompute it from `previousEvents` or durable storage.
+
+**NEVER** throw inside **`recover`** expecting a silent full retry — treat throws as failed recovery per engine semantics.
+
+**NEVER** assume **`ctx.context`** is non-null on recovery for **OBO** tasks. Without a fresh **`UserContext`** from **`resume`**, OBO handlers **MUST** fail closed (see analytics plugin). **Recovery that needs OBO `MUST` use `this.task.resume(ik, { userId, context })` from a new authorised HTTP request**, not blind `autoRecover: true`.
+
+**NEVER** use **`simulateCrash`** outside dev/tests. It requires **`engine.enableTestMode: true`** in config and exists to exercise recovery paths.
+
+**NEVER** import the vendored **`Task`** singleton or **`Engine`** directly for plugin business logic. Use **`this.task`** (via **`requireTask()`** when you need a non-null reference) so lifecycle, telemetry, and AppKit guards stay consistent.
+
+**NEVER** emit **reserved** custom event names (section 4) — they are dropped with a warning and never reach the client.
+
+---
+
+## 11. Testing
+
+**MUST** mock the task service in unit tests (for example `vi.mock("@databricks/appkit", …)` swapping the module surface) or inject a fake `TaskManager` if your harness supports it.
+
+**SHOULD** when integration-testing, opt into **`enableTestMode: true`** in the task service config only in tests, then exercise **`this.task.simulateCrash(idempotencyKey)`** and recovery.
+
+**SHOULD** assert:
+
+- Handlers do not double-commit side effects when re-run under the same IK.
+- Recovery skips work based on **`custom:*`** events in `previousEvents`.
+- OBO resume without **`context`** fails loudly where your plugin requires it.
+
+---
+
+## 12. Quick Reference
+
+```typescript
+import {
+ step,
+ TASK_IDEMPOTENCY_HEADER,
+ type TaskContext,
+} from "@databricks/appkit";
+
+// setup (inside Plugin subclass)
+async setup() {
+ this.requireTask().task({
+ name: "my-op",
+ execute: (input, ctx) => this.myOp(input, ctx),
+ autoRecover: true,
+ });
+}
+
+// Route — service principal
+this.route(router, {
+ name: "run-sp",
+ method: "post",
+ path: "/run",
+ handler: async (req, res) => {
+ await this.executeTask(res, "my-op", req.body, {
+ cancelOnDisconnect: true,
+ disconnectGraceMs: 5000,
+ });
+ // res.getHeader(TASK_IDEMPOTENCY_HEADER) when headers are readable (same-origin fetch)
+ },
+});
+
+// Route — OBO (executeTask under asUser proxy)
+this.route(router, {
+ name: "run-obo",
+ method: "post",
+ path: "/run-as-user",
+ handler: async (req, res) => {
+ await this.asUser(req).executeTask(res, "my-op", req.body, {
+ cancelOnDisconnect: false,
+ });
+ },
+});
+
+// Handler with recovery inspection (engine stores custom:* on TaskEvent.eventType)
+async myOp(input: MyInput, ctx: TaskContext): Promise {
+ const last = ctx.previousEvents.findLast(e => e.eventType === "custom:stage_done");
+ void last;
+ await ctx.emit("stage_done", { stage: 1 });
+ return { ok: true };
+}
+
+// Programmatic consume
+const tf = this.requireTask();
+const handle = await tf.start("my-op", input, { userId: "optional-owner-id" });
+for await (const ev of tf.subscribe(handle.idempotencyKey)) {
+ void ev.event.eventType;
+}
+```
+
+Named step (avoid anonymous `step(async (ctx) => …)` collisions):
+
+```typescript
+const fetchRows = step(async function fetchRows(ctx: TaskContext, id: string) {
+ return lookup(id);
+});
+```
diff --git a/CLAUDE.md b/CLAUDE.md
index eaf504796..2457cede9 100644
--- a/CLAUDE.md
+++ b/CLAUDE.md
@@ -234,6 +234,60 @@ The SDK has built-in SSE support with automatic reconnection:
- Plugin name as default tracer/meter scope
- Supports traces, metrics, logs (configurable per plugin)
+### Durable Task Service
+
+A built-in **durable execution service** available to every plugin via `this.task`. Use it for any operation that needs to survive process restart: long-running queries, agent loops with expensive LLM calls, multi-step pipelines, sagas, async user-facing operations.
+
+**Mental model:** a durable function with an event log. Handlers emit structured events during execution, those events are appended to a write-ahead log, and on crash the recovery worker re-spawns the handler with `previousEvents` so it can resume from the last meaningful checkpoint. There is no workflow DSL, no replay engine, no determinism constraint on the handler.
+
+**Three primitives:**
+- **Event log** — handlers emit checkpoints/intermediate results via `ctx.emit(name, payload)`. Each event is sequenced and committed to the WAL. Same log doubles as a real-time SSE stream for clients.
+- **Smart recovery** — opt-in per task. If the task crashes, `recover` (or the same handler with `ctx.isRecovery = true`) gets `ctx.previousEvents` and decides what "resume" means.
+- **WAL-first durability** — every state transition appends to a sequential WAL before anything else. A background worker batches flushes to SQLite. Sub-millisecond per-event latency, with explicit-flush available when correctness needs strict persistence.
+
+**Bootstrap:** the task service is initialized by `createApp` automatically. Default storage is SQLite at `.appkit/tasks/tasks.db` and WAL at `.appkit/tasks/wal`. SQLite storage is **ephemeral on Databricks Apps** (multi-pod, no shared volume) — the runtime logs a startup warning and falls back to "tasks die with the pod" semantics. For durability in Apps, configure `task: { storage: { backend: 'lakebase', connectionString: … } }`. Opt out entirely with `createApp({ task: false, plugins: [...] })`; `this.task` is then `null` and plugins must handle that case.
+
+**Public API on `this.task`:**
+```typescript
+this.task.task({ name, execute, recover?, autoRecover? }) // register a durable task in setup()
+this.task.start(name, input, opts) // spawn an attempt; returns { idempotencyKey }
+this.task.subscribe(idempotencyKey, lastSeq?) // async iterable of TaskEvent
+this.task.resume(idempotencyKey, opts?) // resume a paused/suspended task
+this.task.stop(idempotencyKey, opts?) // request cooperative cancellation
+```
+
+**The `executeTask` shortcut.** For the common "POST starts a durable task, GET streams its progress over SSE" pattern, use `this.executeTask(res, taskName, input, settings?)` from any route handler. It calls `start`, bridges `subscribe` to the SSE response with `Last-Event-ID` reconnection support, captures the request context for OBO automatically, and dedupes against existing tasks by idempotency key. **`executeTask` deliberately omits the `retry`, `cache`, and `timeout` knobs that `execute()` accepts** — the engine replaces those with stronger primitives (smart recovery, idempotency-key dedup, cooperative `stop`). Compile-time error if you try to pass them.
+
+**When to use which execution method:**
+
+| Method | Use for | Durability | Recovery |
+|---|---|---|---|
+| `this.execute(fn, settings)` | Sub-second to ~5s reads with retry/cache/timeout | None | None (retry from scratch) |
+| `this.executeStream(res, gen, settings)` | Live progress for short ops; no crash survival | None | None |
+| `this.executeTask(res, name, input)` | Anything that should survive restart, agentic loops, long pipelines | WAL → SQLite | Smart, opt-in via `recover` |
+
+**Recovery snippet:**
+```typescript
+this.task.task({
+ name: "agent-loop",
+ autoRecover: true,
+ execute: async (input: ChatInput, ctx): Promise => {
+ if (ctx.isRecovery) {
+ // Find the last successful checkpoint and resume from there.
+ const lastTurn = ctx.previousEvents.findLast(e => e.eventType === "custom:turn_done");
+ input = { ...input, resumeFromTurn: lastTurn?.payload.turn ?? 0 };
+ }
+ for (let turn = input.resumeFromTurn ?? 0; turn < MAX_TURNS; turn++) {
+ const result = await this.callLlm(ctx, input);
+ await ctx.emit("turn_done", { turn, result });
+ if (result.stop_reason !== "tool_use") return { turns: turn + 1 };
+ }
+ },
+});
+```
+
+**OBO (asUser) is automatic** when calling `executeTask` from inside a route handler — the bridge captures the active `UserContext` via the `asUser(req)` proxy and forwards it to the handler as `ctx.context`. Inside the handler, `appkit..asUser(ctx.context)` works exactly like in a normal route. **Never pass `context: req` manually** — `executeTask` does it for you. Combining `autoRecover: true` with OBO is incompatible (the recovery worker has no UserContext after restart) and produces a one-time runtime warning at the first OBO `executeTask` call for a misconfigured task; use `autoRecover: false` + explicit `this.task.resume()` from a fresh authenticated request instead.
+
### Analytics Query Pattern
The AnalyticsPlugin provides SQL query execution:
@@ -402,6 +456,7 @@ This project uses conventional commits (enforced by commitlint):
### Key Dependencies
- `@databricks/sdk-experimental` v0.16.0 - Databricks services SDK
+- `@databricks/taskflow` (vendored at `packages/appkit/vendor/taskflow/`) - Durable execution engine (Rust + Node.js FFI). Bootstraps automatically with AppKit. Vendored artifacts are pinned in `packages/appkit/vendor/taskflow/VENDOR.json` (sha256 per platform); refresh by re-running the upstream `taskflow` build and copying outputs into the vendor directory, then rerun `pnpm test` with `APPKIT_VERIFY_TASKFLOW_VENDOR=1` to re-derive the digests.
- `express` - HTTP server
- `zod` - Runtime validation
- `OpenTelemetry` - Observability (traces, metrics, logs)
@@ -412,9 +467,12 @@ This project uses conventional commits (enforced by commitlint):
3. **Streaming-first** - Built-in SSE support with reconnection
4. **Observability** - OpenTelemetry integration is first-class
5. **Dev Experience** - HMR, hot-reload, source maps, inspection tools
+6. **Durable by default for long ops** - Any operation that should survive process restart belongs on the task service (`this.task` / `this.executeTask`). Don't reinvent durability with retry-and-disk.
### Graceful Shutdown
The server handles SIGTERM/SIGINT with:
-- 15-second timeout
-- Aborts in-flight operations
-- Closes connections gracefully
+- 15-second timeout (force-exit deadline)
+- Re-entrancy guard: the second signal during a Ctrl-C race is ignored
+- Aborts in-flight operations (SSE streams release HTTP connections first)
+- Drains core services in reverse boot order — the task manager walks active `executeTask` bridges and writes a final `event: error / server_shutting_down` frame so clients see a clean signal instead of an EOF
+- Closes HTTP listener last
diff --git a/packages/appkit/src/tasks/execute-task.ts b/packages/appkit/src/tasks/execute-task.ts
index ab32b43b9..8cf7b4d06 100644
--- a/packages/appkit/src/tasks/execute-task.ts
+++ b/packages/appkit/src/tasks/execute-task.ts
@@ -35,6 +35,19 @@ import type { ActiveBridge } from "./types";
*/
export const TASK_IDEMPOTENCY_HEADER = "X-AppKit-Task-Idempotency-Key";
+/**
+ * Wall-clock idle keep-alive interval, in milliseconds. The engine
+ * already emits its own `heartbeat` events when the executor is
+ * actively running, but a task can be quiet for several seconds at a
+ * time (e.g. waiting on a slow downstream call, or stopped/paused
+ * while a client is still subscribed). 25 s sits comfortably under
+ * the typical 60 s idle-socket timeout enforced by AWS ELBs,
+ * Cloudflare, GCP HTTPS LBs, and most corporate proxies.
+ *
+ * @internal
+ */
+export const IDLE_KEEPALIVE_INTERVAL_MS = 25_000;
+
const logger = createLogger("tasks:execute");
/** Process-scoped to log the OBO+autoRecover warning once per (plugin, task). @internal */
@@ -95,6 +108,12 @@ export async function executeTask(
// Hoisted so the outer `finally` can release the bridge regardless
// of which branch unwinds.
let unregisterBridge: (() => void) | null = null;
+ // Wall-clock idle keep-alive timer. Belt-and-braces on top of the
+ // engine `heartbeat` event stream: if the engine stays silent for
+ // more than {@link IDLE_KEEPALIVE_INTERVAL_MS} we still write a
+ // comment frame so an idle load balancer doesn't drop the socket
+ // mid-task. Cleared from the outer `finally`.
+ let idleKeepAlive: ReturnType | null = null;
try {
// `context` is the live executor sidecar — never serialised, never
// seen by recovery. Lets the task body re-enter `runInUserContext`
@@ -184,6 +203,16 @@ export async function executeTask(
};
unregisterBridge = manager._registerBridge(bridge);
+ // Install the wall-clock keep-alive. `unref()` so this timer
+ // doesn't keep the process alive on its own — if everything else
+ // has shut down, the SSE response is already in the process of
+ // closing and there's nothing to keep alive.
+ idleKeepAlive = setInterval(() => {
+ if (res.writableEnded) return;
+ writeSseComment(res, "hb");
+ }, IDLE_KEEPALIVE_INTERVAL_MS);
+ idleKeepAlive.unref?.();
+
for await (const streamEvent of manager.subscribe(
idempotencyKey,
lastEventId,
@@ -299,6 +328,7 @@ export async function executeTask(
);
throw err;
} finally {
+ if (idleKeepAlive) clearInterval(idleKeepAlive);
unregisterBridge?.();
span?.end();
}
diff --git a/packages/appkit/src/tasks/tests/execute-task.test.ts b/packages/appkit/src/tasks/tests/execute-task.test.ts
new file mode 100644
index 000000000..50b10671a
--- /dev/null
+++ b/packages/appkit/src/tasks/tests/execute-task.test.ts
@@ -0,0 +1,175 @@
+/**
+ * Tests for the durable `executeTask` SSE bridge.
+ *
+ * Most behaviour of the bridge is covered indirectly by the analytics
+ * plugin's integration tests (wire-shape, recovery, terminal events).
+ * The cases below isolate the production-hardening primitives that
+ * have no other test coverage:
+ *
+ * - the wall-clock idle keep-alive that fires when the engine stream
+ * is genuinely silent for `IDLE_KEEPALIVE_INTERVAL_MS` ms.
+ */
+
+import { afterEach, beforeEach, describe, expect, test, vi } from "vitest";
+import { createStubTaskManager } from "../../../../../tools/test-helpers";
+import { executeTask, IDLE_KEEPALIVE_INTERVAL_MS } from "../execute-task";
+import type { TaskManager } from "../index";
+
+type SsePiece = string;
+
+/**
+ * Minimal `express.Response` stand-in: captures every chunk written so
+ * the test can assert SSE framing without spinning up a real server.
+ */
+function createMockResponse() {
+ const chunks: SsePiece[] = [];
+ let writableEnded = false;
+ let headersSent = false;
+ const headers: Record = {};
+
+ const req = {
+ once: vi.fn(),
+ header: vi.fn(() => undefined),
+ };
+
+ const res = {
+ req,
+ get statusCode() {
+ return 200;
+ },
+ set statusCode(_v: number) {},
+ get headersSent() {
+ return headersSent;
+ },
+ get writableEnded() {
+ return writableEnded;
+ },
+ setHeader: vi.fn((k: string, v: string) => {
+ headers[k] = v;
+ }),
+ flushHeaders: vi.fn(() => {
+ headersSent = true;
+ }),
+ write: vi.fn((chunk: string) => {
+ chunks.push(chunk);
+ return true;
+ }),
+ end: vi.fn(() => {
+ writableEnded = true;
+ }),
+ status: vi.fn(function status(this: unknown, _code: number) {
+ return res;
+ }),
+ json: vi.fn(),
+ } as unknown as import("express").Response;
+
+ return { res, chunks, headers, req };
+}
+
+/**
+ * Minimal `ITelemetry` stand-in: returns no tracer, so the bridge skips
+ * span work entirely. We just need the surface area for `deps`.
+ */
+function createNoopTelemetry() {
+ return {
+ getTracer: () => null,
+ getMeter: () => null,
+ getLogger: () => null,
+ } as unknown as import("../../telemetry").ITelemetry;
+}
+
+describe("executeTask idle keep-alive", () => {
+ beforeEach(() => {
+ vi.useFakeTimers();
+ });
+
+ afterEach(() => {
+ vi.useRealTimers();
+ });
+
+ test("writes an SSE comment after IDLE_KEEPALIVE_INTERVAL_MS while engine is silent", async () => {
+ const stub = createStubTaskManager();
+ const managerAsType = stub as unknown as TaskManager;
+ const { res, chunks } = createMockResponse();
+
+ // Handler awaits a deferred we hold open so the engine stream
+ // stays silent past the keep-alive window. The fake-timer clock
+ // can advance past the wall-clock interval without `def.execute`
+ // ever resolving, which is exactly the production-hostile shape
+ // (long downstream call + idle proxy) the keep-alive exists for.
+ let releaseHandler!: () => void;
+ const handlerDone = new Promise((resolve) => {
+ releaseHandler = resolve;
+ });
+ stub.task({
+ name: "silent-task",
+ execute: async () => {
+ await handlerDone;
+ return { ok: true };
+ },
+ });
+
+ const bridge = executeTask(
+ {
+ manager: managerAsType,
+ telemetry: createNoopTelemetry(),
+ pluginName: "test",
+ },
+ res,
+ "silent-task",
+ { x: 1 },
+ { telemetry: { traces: false } },
+ );
+
+ // Flush microtasks so `manager.start` resolves and the keep-alive
+ // interval is installed before we advance time.
+ await vi.advanceTimersByTimeAsync(0);
+
+ // No keep-alive yet: we haven't reached the interval boundary.
+ expect(chunks.some((c) => c.startsWith(": hb"))).toBe(false);
+
+ // Wall-clock keep-alive should fire once per interval window.
+ await vi.advanceTimersByTimeAsync(IDLE_KEEPALIVE_INTERVAL_MS);
+ const afterFirstFire = chunks.filter((c) => c.startsWith(": hb")).length;
+ expect(afterFirstFire).toBeGreaterThanOrEqual(1);
+
+ await vi.advanceTimersByTimeAsync(IDLE_KEEPALIVE_INTERVAL_MS);
+ const afterSecondFire = chunks.filter((c) => c.startsWith(": hb")).length;
+ expect(afterSecondFire).toBeGreaterThan(afterFirstFire);
+
+ // Release the handler so the bridge can finish cleanly and the
+ // outer `finally` clears the interval.
+ releaseHandler();
+ await bridge;
+ });
+
+ test("clears the keep-alive interval when the bridge exits", async () => {
+ const stub = createStubTaskManager();
+ const managerAsType = stub as unknown as TaskManager;
+ const { res, chunks } = createMockResponse();
+ stub.task({
+ name: "fast-task",
+ execute: async () => ({ ok: true }),
+ });
+
+ await executeTask(
+ {
+ manager: managerAsType,
+ telemetry: createNoopTelemetry(),
+ pluginName: "test",
+ },
+ res,
+ "fast-task",
+ {},
+ { telemetry: { traces: false } },
+ );
+
+ // After the bridge exits, the keep-alive must not continue to
+ // fire — advancing a full interval should add no new `: hb`
+ // frames.
+ const baseline = chunks.filter((c) => c.startsWith(": hb")).length;
+ await vi.advanceTimersByTimeAsync(IDLE_KEEPALIVE_INTERVAL_MS * 2);
+ const after = chunks.filter((c) => c.startsWith(": hb")).length;
+ expect(after).toBe(baseline);
+ });
+});