diff --git a/.claude/references/tasks.md b/.claude/references/tasks.md new file mode 100644 index 000000000..a9d2dfe11 --- /dev/null +++ b/.claude/references/tasks.md @@ -0,0 +1,404 @@ +# Durable Task Best Practices + +Reference guide for using the durable task service inside AppKit plugins. A built-in **durable execution service** — every plugin gets `this.task` (unless `createApp({ task: false })`) and `this.executeTask` on `Plugin`. + +Every guideline is prefixed with a severity tier: + +- **NEVER** — Security, correctness, or breakage blocker. Violating this corrupts state, double-charges users, or loses durability guarantees. +- **MUST** — Correctness requirement. Violating this produces bugs, broken recovery, or inconsistent behavior. +- **SHOULD** — Quality recommendation. Violating this degrades DX, performance, or maintainability. + +> **Scope:** the task service used from inside core or custom AppKit plugins (`packages/appkit/src/plugins/` or external). For the underlying engine architecture, see the upstream engine repo's `docs/ARCHITECTURE.md` and `docs/ONE_PAGER.md`. + +**Imports.** AppKit surfaces task types and helpers from `@databricks/appkit` (for example `step`, `TaskDefinition`, `TypedTaskContext`, `TaskContext`, `TASK_IDEMPOTENCY_HEADER`, `setupSseHeaders`, `writeSseFrame`). The vendored native engine lives under `packages/appkit/vendor/taskflow/`; do not import the vendored `Taskflow` facade from that path inside plugins — use `this.task` instead. + +--- + +## 1. When to Use Durable Tasks + +The decision is binary. Reach for it when **at least one** of these is true: + +- Operation runs longer than ~5 seconds and the user/client expects to see progress. +- Re-running from scratch on crash is **incorrect** (side effects already persisted) or **expensive** (compute, money, time). +- The operation must survive process restart (rolling deploy, crash, host reclaim). +- Multiple steps each have their own success/failure semantics that need to be tracked. + +**MUST** use `this.executeTask(res, name, input, settings?)` — not `this.execute()` — for any operation matching the above. Do not try to reinvent durability with the `retry` interceptor plus ad-hoc disk writes. + +**MUST NOT** use it for sub-second reads, stateless transformations, or fire-and-forget logging. The WAL append cost is small but the conceptual overhead (task naming, idempotency keys, recovery) is not. Use `this.execute()` with retry/cache/timeout interceptors instead. + +| Method | Latency budget | Crash survival | Use when | +|---|---|---|---| +| `this.execute(fn, settings)` | < 5s typical | None | Read with retry/cache/timeout | +| `this.executeStream(res, gen, settings)` | seconds to minutes | None | Live progress; loss-on-crash acceptable | +| `this.executeTask(res, name, input, settings?)` | seconds to hours | Full | Anything you would be sad to re-run from scratch | + +--- + +## 2. Task Registration + +**MUST** register every durable task in `setup()`, never in `injectRoutes()` or constructors. `setup()` runs once after plugins boot; route handlers run per request. + +**MUST** pass a single **`TaskDefinition`** object to `this.task.task(definition)`. The shipped shape is `{ name, execute, recover?, autoRecover? }`, not separate positional arguments. + +```typescript +import type { TypedTaskContext } from "@databricks/appkit"; + +async setup() { + this.task.task({ + name: "agent-loop", + execute: (input, ctx) => this.runAgentLoop(input, ctx), + recover: (input, ctx) => this.recoverAgentLoop(input, ctx), + autoRecover: true, + }); + this.task.task({ + name: "export-data", + execute: (input, ctx) => this.exportData(input, ctx), + autoRecover: true, + }); +} +``` + +**MUST** use lowercase kebab-case (or a consistent colon-separated prefix convention such as `my-plugin:query`) for the `name` field. The string you pass is the task name the engine stores; AppKit does not auto-prefix it. Multiple plugins should choose names that do not collide. + +**NEVER** register the same `name` twice expecting two handlers. The service **warns** and the new registration **replaces** the previous one; recovery and routing become confusing. + +**NEVER** change a task's `name` across releases without a migration plan. Names are persisted in the WAL — renaming orphans in-flight tasks across deploy. + +**MUST** understand `autoRecover`: the service records `definition.autoRecover ?? true`. So **`true` is the default for all tasks**, including OBO tasks. There is no special default that flips off for OBO. For OBO workloads, you **MUST** set `autoRecover: false` yourself: the recovery worker has no `UserContext`, so automatic recovery will break `asUser`-style calls after restart. + +**MUST** treat the runtime warning in `executeTask` honestly: if you register an OBO task with `autoRecover: true` (explicitly or by default), AppKit logs a **once-per-(plugin,task)** warning that recovery after restart will run without the original `UserContext`. That is guidance, not a change to the default — **fix the registration** (`autoRecover: false` + explicit `resume`) rather than relying on the warning. + +**SHOULD** provide `recover` when re-running `execute` from scratch would be expensive or unsafe. Without `recover`, the engine may still re-invoke the handler on recovery per engine semantics; `recover` is your typed hook when you need custom resumption logic. + +**SHOULD** use `this.task.task({ ... })` with a third generic (`Events`) so `ctx.emit` is tied to a known event map — the same names and payload shapes are what the SSE bridge exposes to clients. + +--- + +## 3. Idempotency Keys + +The engine derives each task's idempotency key (IK) from the **task name**, a **canonical form of the input**, and the **submit-time `userId`** (from the active user context when OBO, or absent for service-principal runs). Plugin code does **not** pass an IK override through `executeTask` — `ExecuteTaskSettings` has no `idempotencyKey` field. + +**MUST** treat the IK as the **identity of the logical task**. Duplicate submits with the same name + input + owner dedupe per the engine's `executeMode` (`SubmitOptions`). + +**MUST** design `input` so that everything that should distinguish two logical runs is **on the input object**. Example: the analytics plugin includes `queryKey`, statement, parameters, executor key, and format discriminator so distinct client operations do not collide. + +**SHOULD** return the IK to clients when they need reconnect or `/resume` / `/stop` follow-ups. The `executeTask` bridge sets the HTTP header **`TASK_IDEMPOTENCY_HEADER`** (`"X-AppKit-Task-Idempotency-Key"`) and sends an initial SSE frame `event: ready` with `data: {"idempotencyKey":...}` so cross-origin `EventSource` clients that cannot read headers still get the key. Import `TASK_IDEMPOTENCY_HEADER` from `@databricks/appkit` if you read it in middleware or tests. + +**NEVER** assume you can forge ownership by passing a user id from the client. `ExecuteTaskSettings` explicitly forbids a `userId` field (`never`) — identity comes only from `runInUserContext` / `asUser(req)`. + +**NEVER** fold attempt counters, retry numbers, or timestamps into inputs when they would change what should be the **same** logical task — that creates accidental IK churn. Conversely, do not omit fields from input that should separate two runs. + +**SHOULD** switch the storage backend for real multi-pod deployment: if the runtime looks like **Databricks Apps** (`DATABRICKS_APP_NAME`, `DATABRICKS_APP_ID`, or `DATABRICKS_APP_URL`) and you still use the default **SQLite** backend, AppKit logs a **WARN** that tasks will not survive rolling restarts; plan for **`lakebase`** (Postgres) or another shared backend via `createApp({ task: { storage: … } })`. + +--- + +## 4. Handler Signature + +The handler signature is fixed on `TaskDefinition.execute`: + +```typescript +execute(input: TInput, ctx: TypedTaskContext): Promise +``` + +**`ctx` provides** (engine `TaskContext`, narrowed for `emit` when you use `TypedTaskContext`): + +| Field | Type | Use | +|---|---|---| +| `ctx.emit(name, payload)` | `(string, unknown) => Promise` | Append a user event; becomes SSE after bridge rules below | +| `ctx.isRecovery` | `boolean` | `true` if this invocation is a recovery path | +| `ctx.previousEvents` | `TaskEvent[]` | Historical events for this task; inspect in recovery | +| `ctx.context` | `unknown \| null` | Live sidecar (e.g. `UserContext`); **not** persisted across crash | +| `ctx.idempotencyKey` | `string` | IK for this task | +| `ctx.attempt` | `number` | Attempt counter | +| `ctx.taskId`, `ctx.userId`, `ctx.heartbeat()` | … | See engine types in `task.d.ts` | + +**MUST** treat the handler as a function that may run in another process after failure. Durable state must come from `input`, `ctx.previousEvents`, or external storage — not from closures that assume a single long-lived Node process. + +**MUST** branch on `ctx.isRecovery` when semantics differ after restart. + +**MUST** await every `ctx.emit()` call. + +**MUST** when reading `ctx.previousEvents`, compare against engine **`eventType` strings as stored**, not the short names you passed to `emit`. User emits are persisted with a **`custom:`** prefix — e.g. `ctx.emit("tick", …)` produces `eventType: "custom:tick"`. The **`executeTask` SSE bridge strips `custom:` for frames sent to the client**, but **`previousEvents` in the handler still use the prefixed form.** + +**NEVER** emit custom events whose **short name** (after `custom:` is stripped for wire purposes) matches any reserved bridge or terminal name. If you `ctx.emit` one of these, the bridge **logs a warning and drops** the frame so clients do not get misleading terminal or control events: + +- `ready`, `error`, `heartbeat`, `completed`, `failed`, `cancelled`, `suspended` + +Choose alternatives such as `task_completed` or `query_tick`. + +**SHOULD** keep payloads JSON-friendly. The bridge serialises with `JSON.stringify` and a **BigInt** replacer (values become **decimal strings** on the wire) so warehouse `LONG`/`BIGINT` columns do not throw at serialisation time. + +--- + +## 5. The `step()` Helper (Not a Decorator) + +AppKit exports **`step`** as a **higher-order function** from `@databricks/appkit`. There is **no** `@step` decorator on `Plugin`. + +```typescript +import { step } from "@databricks/appkit"; + +const fetchInvoices = step(async (ctx, accountId: string) => { + return this.invoiceClient.list({ accountId }); +}); + +// Inside this.task.task({ execute: async (input, ctx) => { ... } }) +const rows = await fetchInvoices(ctx, input.accountId); +``` + +**Semantics:** + +- **`step(fn)` returns a memoised wrapper.** The binding to the native engine is **lazy**: it resolves on **first invocation** inside a running task, after the task service has initialised. Calling the wrapper **before** `createApp` has booted the task service throws `InitializationError`. +- The engine keys checkpoints using the wrapped function's **`Function.name`**. **Anonymous arrow functions all share an empty name and can collide.** **MUST** use **named function expressions** or **named async functions** for anything you wrap with `step`. +- On recovery, completed steps short-circuit to cached results — use for expensive or non-idempotent segments. + +**MUST** ensure step bodies are safe to skip on replay when already recorded (idempotent reads, or deduped writes). + +**NEVER** call `this.task.start()` from inside a `step` body or another task handler in a way that creates unbounded nested task fan-out. Orchestrate with separate tasks or explicit emits. + +**SHOULD** keep `step` units small and name them after their effect. Engine step events use the `custom:step:…` pattern on the WAL; the **`executeTask` bridge drops `custom:step:*` events** (WAL-only checkpoints, not client SSE). + +--- + +## 6. `executeTask`: The Common Pattern + +`this.executeTask(res, taskName, input, settings?)` bridges **`this.task.start` → SSE subscribe loop** for the POST-and-stream pattern. + +```typescript +injectRoutes(router: IAppRouter) { + this.route(router, { + name: "run", + method: "post", + path: "/run", + handler: async (req, res) => { + await this.executeTask(res, "agent-loop", req.body, { + cancelOnDisconnect: true, + disconnectGraceMs: 5000, + telemetry: { traces: true, metrics: true }, + }); + }, + }); +} +``` + +**`ExecuteTaskSettings`** (only fields that exist): + +- `cancelOnDisconnect?` — default `true`; when `true`, after the client closes the TCP connection the bridge waits **`disconnectGraceMs`** (default **5000** ms) then calls `this.task.stop` with reason `client_disconnected`. Set `cancelOnDisconnect: false` for long OBO runs where you expect `EventSource` reconnects. **Note:** OBO tokens still expire (~1 hour); multi-hour OBO work should use `autoRecover: false` and **`resume` from a fresh auth** before token expiry. +- `disconnectGraceMs?` — non-negative milliseconds; ignored when `cancelOnDisconnect` is `false`. +- `telemetry?` — `{ traces?, metrics? }` (defaults `true`). + +**Compile-time `never` guards** (do not pass): `retry`, `cache`, `timeout`, `stream`, **`userId`**. the task service replaces retry/cache/timeout; the bridge wire is fixed (no `stream.eventFilter`); identity must never be taken from request bodies. + +**Identity / OBO.** The bridge passes `userId` and `context` from **`getCurrentUserContext()`** into `this.task.start`. For OBO, call through the proxy: **`await this.asUser(req).executeTask(res, …)`**. For service principal, call `this.executeTask` without entering user context. + +**Wire behaviour (in order):** + +1. `this.task.start(taskName, input, { userId, context })`. +2. If headers are not yet sent: set **`X-AppKit-Task-Idempotency-Key`**, call `setupSseHeaders(res)`, write **`event: ready`** with the IK in JSON. +3. `this.task.subscribe(idempotencyKey, lastSeq)` where `lastSeq` comes from **`Last-Event-ID`** on the request. +4. For each stream event: **heartbeats** become SSE **comments** (`: hb`), **`custom:step:*`** are skipped, reserved custom names are dropped with a warning, other **`custom:`** events are stripped to the short name for `event:`, **`id:`** is set to `streamSeq` for replay, payloads are JSON with BigInt-safe stringification. +5. On engine terminal types **`completed`**, **`failed`**, **`cancelled`**, the bridge ends the response after forwarding that frame. +6. On AppKit shutdown, active bridges receive **`event: error`** with **`data: {"message":"server_shutting_down"}`** (best effort) before the iterator closes. + +**MUST NOT** pass `retry`, `cache`, `timeout` (interceptor sense), `stream`, or `userId` in settings — they are rejected by types. + +**MUST NOT** call `this.executeTask` outside a request handler without an Express `Response` you own. From `setup`, cron, or workers, use **`this.requireTask().start`** / **`subscribe`** directly (and set `SubmitOptions.context` yourself if OBO). + +**SHOULD** control what the client sees **only** via `ctx.emit` names and payloads (optionally typed with `TaskDefinition` generics). + +--- + +## 7. OBO (`asUser`) with the task service + +**MUST** use `await this.asUser(req).executeTask(...)` when the task body calls plugin code that depends on **`runInUserContext`** (warehouse OBO, etc.). The bridge forwards the live **`UserContext`** as `ctx.context` — it is **never** stored in SQLite; it exists only for the current attempt. + +**NEVER** pass a fake `userId` through settings (the field does not exist) or trust client-supplied identity for resume/stop. + +**MUST** for OBO tasks that must survive process restart **register with `autoRecover: false`** and design **`this.task.resume(idempotencyKey, { userId, context })`** from a **fresh authenticated request** that re-establishes **`UserContext`**: + +```typescript +// setup — OBO-capable task; no automatic recovery without context +this.task.task({ + name: `${this.name}:query`, + execute: (input, ctx) => this._runQuery(input, ctx), + autoRecover: false, +}); + +// Fresh authenticated route: resume must pass the same userId as submit time, and for OBO +// a live context object — mirror Plugin.asUser(req) (ServiceContext + token headers). +// See packages/appkit/src/plugins/analytics/analytics.ts (`_runQueryTask`) for the throw-if-missing contract. +await this.requireTask().resume(ikFromClient, { + userId: this.resolveUserId(req), + // OBO: add `context` — the live `UserContext` for this request (what `executeTask` forwards). +}); +``` + +The **analytics plugin** is the reference: `_runQueryTask` uses **`autoRecover: false`** and **throws** if `input.isAsUser` is true but `ctx.context` is missing (resume/recovery without `context: req` would otherwise risk running as the wrong principal). + +**MUST** pass **`context`** compatible with your handler when resuming OBO work — typically by routing resume through the same `asUser(req)` machinery so `executeTask` or your resume callsite captures `ServiceContext.createUserContext(...)`. + +**SHOULD** scope any **application-level cache** keys with `getCurrentUserId()` for OBO and a stable SP marker for service tasks. + +--- + +## 8. Recovery Patterns (Cookbook) + +Use the **`custom:`** prefix when scanning `ctx.previousEvents`. + +### 8a. Agentic Loop + +Persist conversation state in your DB; use events for streaming only. + +```typescript +this.task.task({ + name: "chat", + execute: async (input, ctx) => { + if (ctx.isRecovery) { + await ctx.emit("recovered", { turns: await this.countDbTurns(input.sessionId) }); + } + // … + }, + autoRecover: true, +}); +``` + +### 8b. Staged Pipeline + +```typescript +async pipeline(input: PipelineInput, ctx: TaskContext) { + const completed = new Set( + ctx.previousEvents + .filter(e => e.eventType === "custom:stage_done") + .map(e => (e.payload as { stage: string }).stage), + ); + if (!completed.has("extract")) { + await this.extract(input); + await ctx.emit("stage_done", { stage: "extract" }); + } + // … +} +``` + +### 8c. Saga (Forward + Compensate) + +```typescript +async saga(input: SagaInput, ctx: TaskContext) { + const completed = ctx.previousEvents + .filter(e => e.eventType === "custom:step_done") + .map(e => e.payload as { name: string; context: unknown }); + const failed = ctx.previousEvents.find(e => e.eventType === "custom:step_failed"); + // … +} +``` + +--- + +## 9. Errors, Suspension, and Shutdown + +**MUST** handle failures from **`this.task.stop`**, **`resume`**, and **`start`** explicitly in production code (`try/catch`, typed error inspection). The vendored `Engine` throws on some invalid transitions; there is no stable public `isTaskPauseError` helper exported from `@databricks/appkit` today — branch on message / name cautiously or map errors in your layer. + +**MUST** rely on AppKit graceful shutdown: **`TaskManager.shutdown()`** drains active SSE bridges ( **`server_shutting_down`** ) then shuts down the native engine. Plugins **MUST NOT** call low-level shutdown from random hooks unless you know you are replacing AppKit lifecycle. + +**SHOULD** call **`this.streamManager.abortAll()`** in **`shutdown()`** if your plugin uses **`executeStream`** alongside the task service SSE — `executeTask` subscriptions are managed by the the task service bridge registry. + +--- + +## 10. NEVER Rules (Compile-Free Footguns) + +**NEVER** call **`this.task.start()`** from inside another task in a way that creates unbounded nested task explosions without a clear orchestration model. + +**NEVER** mutate logical **`input`** fields between attempts for recovery incorrectly — the engine replays stored input. If you need derived state, recompute it from `previousEvents` or durable storage. + +**NEVER** throw inside **`recover`** expecting a silent full retry — treat throws as failed recovery per engine semantics. + +**NEVER** assume **`ctx.context`** is non-null on recovery for **OBO** tasks. Without a fresh **`UserContext`** from **`resume`**, OBO handlers **MUST** fail closed (see analytics plugin). **Recovery that needs OBO `MUST` use `this.task.resume(ik, { userId, context })` from a new authorised HTTP request**, not blind `autoRecover: true`. + +**NEVER** use **`simulateCrash`** outside dev/tests. It requires **`engine.enableTestMode: true`** in config and exists to exercise recovery paths. + +**NEVER** import the vendored **`Task`** singleton or **`Engine`** directly for plugin business logic. Use **`this.task`** (via **`requireTask()`** when you need a non-null reference) so lifecycle, telemetry, and AppKit guards stay consistent. + +**NEVER** emit **reserved** custom event names (section 4) — they are dropped with a warning and never reach the client. + +--- + +## 11. Testing + +**MUST** mock the task service in unit tests (for example `vi.mock("@databricks/appkit", …)` swapping the module surface) or inject a fake `TaskManager` if your harness supports it. + +**SHOULD** when integration-testing, opt into **`enableTestMode: true`** in the task service config only in tests, then exercise **`this.task.simulateCrash(idempotencyKey)`** and recovery. + +**SHOULD** assert: + +- Handlers do not double-commit side effects when re-run under the same IK. +- Recovery skips work based on **`custom:*`** events in `previousEvents`. +- OBO resume without **`context`** fails loudly where your plugin requires it. + +--- + +## 12. Quick Reference + +```typescript +import { + step, + TASK_IDEMPOTENCY_HEADER, + type TaskContext, +} from "@databricks/appkit"; + +// setup (inside Plugin subclass) +async setup() { + this.requireTask().task({ + name: "my-op", + execute: (input, ctx) => this.myOp(input, ctx), + autoRecover: true, + }); +} + +// Route — service principal +this.route(router, { + name: "run-sp", + method: "post", + path: "/run", + handler: async (req, res) => { + await this.executeTask(res, "my-op", req.body, { + cancelOnDisconnect: true, + disconnectGraceMs: 5000, + }); + // res.getHeader(TASK_IDEMPOTENCY_HEADER) when headers are readable (same-origin fetch) + }, +}); + +// Route — OBO (executeTask under asUser proxy) +this.route(router, { + name: "run-obo", + method: "post", + path: "/run-as-user", + handler: async (req, res) => { + await this.asUser(req).executeTask(res, "my-op", req.body, { + cancelOnDisconnect: false, + }); + }, +}); + +// Handler with recovery inspection (engine stores custom:* on TaskEvent.eventType) +async myOp(input: MyInput, ctx: TaskContext): Promise { + const last = ctx.previousEvents.findLast(e => e.eventType === "custom:stage_done"); + void last; + await ctx.emit("stage_done", { stage: 1 }); + return { ok: true }; +} + +// Programmatic consume +const tf = this.requireTask(); +const handle = await tf.start("my-op", input, { userId: "optional-owner-id" }); +for await (const ev of tf.subscribe(handle.idempotencyKey)) { + void ev.event.eventType; +} +``` + +Named step (avoid anonymous `step(async (ctx) => …)` collisions): + +```typescript +const fetchRows = step(async function fetchRows(ctx: TaskContext, id: string) { + return lookup(id); +}); +``` diff --git a/CLAUDE.md b/CLAUDE.md index eaf504796..2457cede9 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -234,6 +234,60 @@ The SDK has built-in SSE support with automatic reconnection: - Plugin name as default tracer/meter scope - Supports traces, metrics, logs (configurable per plugin) +### Durable Task Service + +A built-in **durable execution service** available to every plugin via `this.task`. Use it for any operation that needs to survive process restart: long-running queries, agent loops with expensive LLM calls, multi-step pipelines, sagas, async user-facing operations. + +**Mental model:** a durable function with an event log. Handlers emit structured events during execution, those events are appended to a write-ahead log, and on crash the recovery worker re-spawns the handler with `previousEvents` so it can resume from the last meaningful checkpoint. There is no workflow DSL, no replay engine, no determinism constraint on the handler. + +**Three primitives:** +- **Event log** — handlers emit checkpoints/intermediate results via `ctx.emit(name, payload)`. Each event is sequenced and committed to the WAL. Same log doubles as a real-time SSE stream for clients. +- **Smart recovery** — opt-in per task. If the task crashes, `recover` (or the same handler with `ctx.isRecovery = true`) gets `ctx.previousEvents` and decides what "resume" means. +- **WAL-first durability** — every state transition appends to a sequential WAL before anything else. A background worker batches flushes to SQLite. Sub-millisecond per-event latency, with explicit-flush available when correctness needs strict persistence. + +**Bootstrap:** the task service is initialized by `createApp` automatically. Default storage is SQLite at `.appkit/tasks/tasks.db` and WAL at `.appkit/tasks/wal`. SQLite storage is **ephemeral on Databricks Apps** (multi-pod, no shared volume) — the runtime logs a startup warning and falls back to "tasks die with the pod" semantics. For durability in Apps, configure `task: { storage: { backend: 'lakebase', connectionString: … } }`. Opt out entirely with `createApp({ task: false, plugins: [...] })`; `this.task` is then `null` and plugins must handle that case. + +**Public API on `this.task`:** +```typescript +this.task.task({ name, execute, recover?, autoRecover? }) // register a durable task in setup() +this.task.start(name, input, opts) // spawn an attempt; returns { idempotencyKey } +this.task.subscribe(idempotencyKey, lastSeq?) // async iterable of TaskEvent +this.task.resume(idempotencyKey, opts?) // resume a paused/suspended task +this.task.stop(idempotencyKey, opts?) // request cooperative cancellation +``` + +**The `executeTask` shortcut.** For the common "POST starts a durable task, GET streams its progress over SSE" pattern, use `this.executeTask(res, taskName, input, settings?)` from any route handler. It calls `start`, bridges `subscribe` to the SSE response with `Last-Event-ID` reconnection support, captures the request context for OBO automatically, and dedupes against existing tasks by idempotency key. **`executeTask` deliberately omits the `retry`, `cache`, and `timeout` knobs that `execute()` accepts** — the engine replaces those with stronger primitives (smart recovery, idempotency-key dedup, cooperative `stop`). Compile-time error if you try to pass them. + +**When to use which execution method:** + +| Method | Use for | Durability | Recovery | +|---|---|---|---| +| `this.execute(fn, settings)` | Sub-second to ~5s reads with retry/cache/timeout | None | None (retry from scratch) | +| `this.executeStream(res, gen, settings)` | Live progress for short ops; no crash survival | None | None | +| `this.executeTask(res, name, input)` | Anything that should survive restart, agentic loops, long pipelines | WAL → SQLite | Smart, opt-in via `recover` | + +**Recovery snippet:** +```typescript +this.task.task({ + name: "agent-loop", + autoRecover: true, + execute: async (input: ChatInput, ctx): Promise => { + if (ctx.isRecovery) { + // Find the last successful checkpoint and resume from there. + const lastTurn = ctx.previousEvents.findLast(e => e.eventType === "custom:turn_done"); + input = { ...input, resumeFromTurn: lastTurn?.payload.turn ?? 0 }; + } + for (let turn = input.resumeFromTurn ?? 0; turn < MAX_TURNS; turn++) { + const result = await this.callLlm(ctx, input); + await ctx.emit("turn_done", { turn, result }); + if (result.stop_reason !== "tool_use") return { turns: turn + 1 }; + } + }, +}); +``` + +**OBO (asUser) is automatic** when calling `executeTask` from inside a route handler — the bridge captures the active `UserContext` via the `asUser(req)` proxy and forwards it to the handler as `ctx.context`. Inside the handler, `appkit..asUser(ctx.context)` works exactly like in a normal route. **Never pass `context: req` manually** — `executeTask` does it for you. Combining `autoRecover: true` with OBO is incompatible (the recovery worker has no UserContext after restart) and produces a one-time runtime warning at the first OBO `executeTask` call for a misconfigured task; use `autoRecover: false` + explicit `this.task.resume()` from a fresh authenticated request instead. + ### Analytics Query Pattern The AnalyticsPlugin provides SQL query execution: @@ -402,6 +456,7 @@ This project uses conventional commits (enforced by commitlint): ### Key Dependencies - `@databricks/sdk-experimental` v0.16.0 - Databricks services SDK +- `@databricks/taskflow` (vendored at `packages/appkit/vendor/taskflow/`) - Durable execution engine (Rust + Node.js FFI). Bootstraps automatically with AppKit. Vendored artifacts are pinned in `packages/appkit/vendor/taskflow/VENDOR.json` (sha256 per platform); refresh by re-running the upstream `taskflow` build and copying outputs into the vendor directory, then rerun `pnpm test` with `APPKIT_VERIFY_TASKFLOW_VENDOR=1` to re-derive the digests. - `express` - HTTP server - `zod` - Runtime validation - `OpenTelemetry` - Observability (traces, metrics, logs) @@ -412,9 +467,12 @@ This project uses conventional commits (enforced by commitlint): 3. **Streaming-first** - Built-in SSE support with reconnection 4. **Observability** - OpenTelemetry integration is first-class 5. **Dev Experience** - HMR, hot-reload, source maps, inspection tools +6. **Durable by default for long ops** - Any operation that should survive process restart belongs on the task service (`this.task` / `this.executeTask`). Don't reinvent durability with retry-and-disk. ### Graceful Shutdown The server handles SIGTERM/SIGINT with: -- 15-second timeout -- Aborts in-flight operations -- Closes connections gracefully +- 15-second timeout (force-exit deadline) +- Re-entrancy guard: the second signal during a Ctrl-C race is ignored +- Aborts in-flight operations (SSE streams release HTTP connections first) +- Drains core services in reverse boot order — the task manager walks active `executeTask` bridges and writes a final `event: error / server_shutting_down` frame so clients see a clean signal instead of an EOF +- Closes HTTP listener last diff --git a/packages/appkit/src/tasks/execute-task.ts b/packages/appkit/src/tasks/execute-task.ts index ab32b43b9..8cf7b4d06 100644 --- a/packages/appkit/src/tasks/execute-task.ts +++ b/packages/appkit/src/tasks/execute-task.ts @@ -35,6 +35,19 @@ import type { ActiveBridge } from "./types"; */ export const TASK_IDEMPOTENCY_HEADER = "X-AppKit-Task-Idempotency-Key"; +/** + * Wall-clock idle keep-alive interval, in milliseconds. The engine + * already emits its own `heartbeat` events when the executor is + * actively running, but a task can be quiet for several seconds at a + * time (e.g. waiting on a slow downstream call, or stopped/paused + * while a client is still subscribed). 25 s sits comfortably under + * the typical 60 s idle-socket timeout enforced by AWS ELBs, + * Cloudflare, GCP HTTPS LBs, and most corporate proxies. + * + * @internal + */ +export const IDLE_KEEPALIVE_INTERVAL_MS = 25_000; + const logger = createLogger("tasks:execute"); /** Process-scoped to log the OBO+autoRecover warning once per (plugin, task). @internal */ @@ -95,6 +108,12 @@ export async function executeTask( // Hoisted so the outer `finally` can release the bridge regardless // of which branch unwinds. let unregisterBridge: (() => void) | null = null; + // Wall-clock idle keep-alive timer. Belt-and-braces on top of the + // engine `heartbeat` event stream: if the engine stays silent for + // more than {@link IDLE_KEEPALIVE_INTERVAL_MS} we still write a + // comment frame so an idle load balancer doesn't drop the socket + // mid-task. Cleared from the outer `finally`. + let idleKeepAlive: ReturnType | null = null; try { // `context` is the live executor sidecar — never serialised, never // seen by recovery. Lets the task body re-enter `runInUserContext` @@ -184,6 +203,16 @@ export async function executeTask( }; unregisterBridge = manager._registerBridge(bridge); + // Install the wall-clock keep-alive. `unref()` so this timer + // doesn't keep the process alive on its own — if everything else + // has shut down, the SSE response is already in the process of + // closing and there's nothing to keep alive. + idleKeepAlive = setInterval(() => { + if (res.writableEnded) return; + writeSseComment(res, "hb"); + }, IDLE_KEEPALIVE_INTERVAL_MS); + idleKeepAlive.unref?.(); + for await (const streamEvent of manager.subscribe( idempotencyKey, lastEventId, @@ -299,6 +328,7 @@ export async function executeTask( ); throw err; } finally { + if (idleKeepAlive) clearInterval(idleKeepAlive); unregisterBridge?.(); span?.end(); } diff --git a/packages/appkit/src/tasks/tests/execute-task.test.ts b/packages/appkit/src/tasks/tests/execute-task.test.ts new file mode 100644 index 000000000..50b10671a --- /dev/null +++ b/packages/appkit/src/tasks/tests/execute-task.test.ts @@ -0,0 +1,175 @@ +/** + * Tests for the durable `executeTask` SSE bridge. + * + * Most behaviour of the bridge is covered indirectly by the analytics + * plugin's integration tests (wire-shape, recovery, terminal events). + * The cases below isolate the production-hardening primitives that + * have no other test coverage: + * + * - the wall-clock idle keep-alive that fires when the engine stream + * is genuinely silent for `IDLE_KEEPALIVE_INTERVAL_MS` ms. + */ + +import { afterEach, beforeEach, describe, expect, test, vi } from "vitest"; +import { createStubTaskManager } from "../../../../../tools/test-helpers"; +import { executeTask, IDLE_KEEPALIVE_INTERVAL_MS } from "../execute-task"; +import type { TaskManager } from "../index"; + +type SsePiece = string; + +/** + * Minimal `express.Response` stand-in: captures every chunk written so + * the test can assert SSE framing without spinning up a real server. + */ +function createMockResponse() { + const chunks: SsePiece[] = []; + let writableEnded = false; + let headersSent = false; + const headers: Record = {}; + + const req = { + once: vi.fn(), + header: vi.fn(() => undefined), + }; + + const res = { + req, + get statusCode() { + return 200; + }, + set statusCode(_v: number) {}, + get headersSent() { + return headersSent; + }, + get writableEnded() { + return writableEnded; + }, + setHeader: vi.fn((k: string, v: string) => { + headers[k] = v; + }), + flushHeaders: vi.fn(() => { + headersSent = true; + }), + write: vi.fn((chunk: string) => { + chunks.push(chunk); + return true; + }), + end: vi.fn(() => { + writableEnded = true; + }), + status: vi.fn(function status(this: unknown, _code: number) { + return res; + }), + json: vi.fn(), + } as unknown as import("express").Response; + + return { res, chunks, headers, req }; +} + +/** + * Minimal `ITelemetry` stand-in: returns no tracer, so the bridge skips + * span work entirely. We just need the surface area for `deps`. + */ +function createNoopTelemetry() { + return { + getTracer: () => null, + getMeter: () => null, + getLogger: () => null, + } as unknown as import("../../telemetry").ITelemetry; +} + +describe("executeTask idle keep-alive", () => { + beforeEach(() => { + vi.useFakeTimers(); + }); + + afterEach(() => { + vi.useRealTimers(); + }); + + test("writes an SSE comment after IDLE_KEEPALIVE_INTERVAL_MS while engine is silent", async () => { + const stub = createStubTaskManager(); + const managerAsType = stub as unknown as TaskManager; + const { res, chunks } = createMockResponse(); + + // Handler awaits a deferred we hold open so the engine stream + // stays silent past the keep-alive window. The fake-timer clock + // can advance past the wall-clock interval without `def.execute` + // ever resolving, which is exactly the production-hostile shape + // (long downstream call + idle proxy) the keep-alive exists for. + let releaseHandler!: () => void; + const handlerDone = new Promise((resolve) => { + releaseHandler = resolve; + }); + stub.task({ + name: "silent-task", + execute: async () => { + await handlerDone; + return { ok: true }; + }, + }); + + const bridge = executeTask( + { + manager: managerAsType, + telemetry: createNoopTelemetry(), + pluginName: "test", + }, + res, + "silent-task", + { x: 1 }, + { telemetry: { traces: false } }, + ); + + // Flush microtasks so `manager.start` resolves and the keep-alive + // interval is installed before we advance time. + await vi.advanceTimersByTimeAsync(0); + + // No keep-alive yet: we haven't reached the interval boundary. + expect(chunks.some((c) => c.startsWith(": hb"))).toBe(false); + + // Wall-clock keep-alive should fire once per interval window. + await vi.advanceTimersByTimeAsync(IDLE_KEEPALIVE_INTERVAL_MS); + const afterFirstFire = chunks.filter((c) => c.startsWith(": hb")).length; + expect(afterFirstFire).toBeGreaterThanOrEqual(1); + + await vi.advanceTimersByTimeAsync(IDLE_KEEPALIVE_INTERVAL_MS); + const afterSecondFire = chunks.filter((c) => c.startsWith(": hb")).length; + expect(afterSecondFire).toBeGreaterThan(afterFirstFire); + + // Release the handler so the bridge can finish cleanly and the + // outer `finally` clears the interval. + releaseHandler(); + await bridge; + }); + + test("clears the keep-alive interval when the bridge exits", async () => { + const stub = createStubTaskManager(); + const managerAsType = stub as unknown as TaskManager; + const { res, chunks } = createMockResponse(); + stub.task({ + name: "fast-task", + execute: async () => ({ ok: true }), + }); + + await executeTask( + { + manager: managerAsType, + telemetry: createNoopTelemetry(), + pluginName: "test", + }, + res, + "fast-task", + {}, + { telemetry: { traces: false } }, + ); + + // After the bridge exits, the keep-alive must not continue to + // fire — advancing a full interval should add no new `: hb` + // frames. + const baseline = chunks.filter((c) => c.startsWith(": hb")).length; + await vi.advanceTimersByTimeAsync(IDLE_KEEPALIVE_INTERVAL_MS * 2); + const after = chunks.filter((c) => c.startsWith(": hb")).length; + expect(after).toBe(baseline); + }); +});