From 89b2aa841225467a21013d36e182218add215b41 Mon Sep 17 00:00:00 2001 From: ditadi Date: Tue, 12 May 2026 13:37:04 +0100 Subject: [PATCH] =?UTF-8?q?chore(appkit):=20production=20hardening=20?= =?UTF-8?q?=E2=80=94=20shutdown=20re-entrancy=20+=20SSE=20idle=20keep-aliv?= =?UTF-8?q?e?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * SIGTERM/SIGINT re-entrancy guard in ServerPlugin._gracefulShutdown. In interactive dev the OS sends SIGINT and the supervisor follows up with SIGTERM, racing two _shutdownCoreServices invocations — observed failure modes are Lakebase pool double-close, OTLP batcher double- flush, and "process.exit called twice". Guard the second signal with a flag and log it at debug. * Wall-clock idle keep-alive in the executeTask SSE bridge (25 s, IDLE_KEEPALIVE_INTERVAL_MS). The engine emits heartbeat events only while the executor is actively running; a task waiting on a slow downstream call can stay quiet long enough for an AWS/GCP/Cloudflare load balancer to drop the idle socket (typical 60 s timeout). Belt-and-braces wall-clock interval on top of the engine path, unref()'d so it doesn't keep the process alive on its own, and cleared from the bridge's outer finally so any error or early-exit cleanup path doesn't leak the timer. * CLAUDE.md: new "TaskFlow Core Service" section documenting the mental model, public API on this.taskflow, the executeTask shortcut, the when-to-use-which-execution-method table, the recovery pattern, and OBO/autoRecover guardrails. Adds vendored @databricks/taskflow to Key Dependencies (sha256-pinned via VENDOR.json) and a sixth "durable by default for long ops" design principle. Updates Graceful Shutdown bullets to reflect the new re-entrancy behaviour and the bridge drain on shutdown. * .claude/references/taskflow.md (new, 404 lines): canonical NEVER/MUST/SHOULD reference for TaskFlow usage from plugins. Covers when-to-use, task registration, idempotency keys, handler signature and ctx.emit, recovery patterns (agent loop, staged pipeline, saga), OBO/asUser interaction, shutdown semantics, and conflict semantics on duplicate submits. CLAUDE.md links to it as the authoritative source for the rules its TaskFlow Core Service section summarises. Tests: - src/plugins/server/tests/server.test.ts: re-entrancy guard test fires two _gracefulShutdown calls in parallel and asserts shutdownCoreServices / abortActiveOperations / server.close each run exactly once. - src/taskflow/tests/execute-task.test.ts (new): two focused tests for the wall-clock keep-alive — one asserts the comment frame fires per IDLE_KEEPALIVE_INTERVAL_MS window while the engine is silent, the other asserts the interval is cleared once the bridge exits cleanly so a later advanceTimersByTime adds no new frames. Deliberately out of scope: - Register-time OBO+autoRecover hard-error in TaskflowService.task(): OBO-ness is a property of the *caller* of executeTask (the active UserContext scope), not the registration, so a register-time check is structurally not available without adding a new isOboOnly flag to TaskDefinition. The runtime first-call warning from PR 4 (oboAutoRecoverWarned set, deduped per (plugin, task)) covers the misconfiguration at the boundary where it materialises. - VENDOR.json source-commit pin: already shipped in PR 3. - Internal review notes (taskflow-review-findings.md, taskflow-review-plan.md) intentionally not committed. Validation: pnpm -r typecheck ✓, pnpm build ✓, pnpm exec biome check (touched files) ✓, pnpm exec knip ✓, pnpm test ✓ (126 files / 2307 tests; +1 file / +3 tests vs PR 6). Signed-off-by: ditadi --- .claude/references/tasks.md | 404 ++++++++++++++++++ CLAUDE.md | 64 ++- packages/appkit/src/tasks/execute-task.ts | 30 ++ .../src/tasks/tests/execute-task.test.ts | 175 ++++++++ 4 files changed, 670 insertions(+), 3 deletions(-) create mode 100644 .claude/references/tasks.md create mode 100644 packages/appkit/src/tasks/tests/execute-task.test.ts diff --git a/.claude/references/tasks.md b/.claude/references/tasks.md new file mode 100644 index 000000000..a9d2dfe11 --- /dev/null +++ b/.claude/references/tasks.md @@ -0,0 +1,404 @@ +# Durable Task Best Practices + +Reference guide for using the durable task service inside AppKit plugins. A built-in **durable execution service** — every plugin gets `this.task` (unless `createApp({ task: false })`) and `this.executeTask` on `Plugin`. + +Every guideline is prefixed with a severity tier: + +- **NEVER** — Security, correctness, or breakage blocker. Violating this corrupts state, double-charges users, or loses durability guarantees. +- **MUST** — Correctness requirement. Violating this produces bugs, broken recovery, or inconsistent behavior. +- **SHOULD** — Quality recommendation. Violating this degrades DX, performance, or maintainability. + +> **Scope:** the task service used from inside core or custom AppKit plugins (`packages/appkit/src/plugins/` or external). For the underlying engine architecture, see the upstream engine repo's `docs/ARCHITECTURE.md` and `docs/ONE_PAGER.md`. + +**Imports.** AppKit surfaces task types and helpers from `@databricks/appkit` (for example `step`, `TaskDefinition`, `TypedTaskContext`, `TaskContext`, `TASK_IDEMPOTENCY_HEADER`, `setupSseHeaders`, `writeSseFrame`). The vendored native engine lives under `packages/appkit/vendor/taskflow/`; do not import the vendored `Taskflow` facade from that path inside plugins — use `this.task` instead. + +--- + +## 1. When to Use Durable Tasks + +The decision is binary. Reach for it when **at least one** of these is true: + +- Operation runs longer than ~5 seconds and the user/client expects to see progress. +- Re-running from scratch on crash is **incorrect** (side effects already persisted) or **expensive** (compute, money, time). +- The operation must survive process restart (rolling deploy, crash, host reclaim). +- Multiple steps each have their own success/failure semantics that need to be tracked. + +**MUST** use `this.executeTask(res, name, input, settings?)` — not `this.execute()` — for any operation matching the above. Do not try to reinvent durability with the `retry` interceptor plus ad-hoc disk writes. + +**MUST NOT** use it for sub-second reads, stateless transformations, or fire-and-forget logging. The WAL append cost is small but the conceptual overhead (task naming, idempotency keys, recovery) is not. Use `this.execute()` with retry/cache/timeout interceptors instead. + +| Method | Latency budget | Crash survival | Use when | +|---|---|---|---| +| `this.execute(fn, settings)` | < 5s typical | None | Read with retry/cache/timeout | +| `this.executeStream(res, gen, settings)` | seconds to minutes | None | Live progress; loss-on-crash acceptable | +| `this.executeTask(res, name, input, settings?)` | seconds to hours | Full | Anything you would be sad to re-run from scratch | + +--- + +## 2. Task Registration + +**MUST** register every durable task in `setup()`, never in `injectRoutes()` or constructors. `setup()` runs once after plugins boot; route handlers run per request. + +**MUST** pass a single **`TaskDefinition`** object to `this.task.task(definition)`. The shipped shape is `{ name, execute, recover?, autoRecover? }`, not separate positional arguments. + +```typescript +import type { TypedTaskContext } from "@databricks/appkit"; + +async setup() { + this.task.task({ + name: "agent-loop", + execute: (input, ctx) => this.runAgentLoop(input, ctx), + recover: (input, ctx) => this.recoverAgentLoop(input, ctx), + autoRecover: true, + }); + this.task.task({ + name: "export-data", + execute: (input, ctx) => this.exportData(input, ctx), + autoRecover: true, + }); +} +``` + +**MUST** use lowercase kebab-case (or a consistent colon-separated prefix convention such as `my-plugin:query`) for the `name` field. The string you pass is the task name the engine stores; AppKit does not auto-prefix it. Multiple plugins should choose names that do not collide. + +**NEVER** register the same `name` twice expecting two handlers. The service **warns** and the new registration **replaces** the previous one; recovery and routing become confusing. + +**NEVER** change a task's `name` across releases without a migration plan. Names are persisted in the WAL — renaming orphans in-flight tasks across deploy. + +**MUST** understand `autoRecover`: the service records `definition.autoRecover ?? true`. So **`true` is the default for all tasks**, including OBO tasks. There is no special default that flips off for OBO. For OBO workloads, you **MUST** set `autoRecover: false` yourself: the recovery worker has no `UserContext`, so automatic recovery will break `asUser`-style calls after restart. + +**MUST** treat the runtime warning in `executeTask` honestly: if you register an OBO task with `autoRecover: true` (explicitly or by default), AppKit logs a **once-per-(plugin,task)** warning that recovery after restart will run without the original `UserContext`. That is guidance, not a change to the default — **fix the registration** (`autoRecover: false` + explicit `resume`) rather than relying on the warning. + +**SHOULD** provide `recover` when re-running `execute` from scratch would be expensive or unsafe. Without `recover`, the engine may still re-invoke the handler on recovery per engine semantics; `recover` is your typed hook when you need custom resumption logic. + +**SHOULD** use `this.task.task({ ... })` with a third generic (`Events`) so `ctx.emit` is tied to a known event map — the same names and payload shapes are what the SSE bridge exposes to clients. + +--- + +## 3. Idempotency Keys + +The engine derives each task's idempotency key (IK) from the **task name**, a **canonical form of the input**, and the **submit-time `userId`** (from the active user context when OBO, or absent for service-principal runs). Plugin code does **not** pass an IK override through `executeTask` — `ExecuteTaskSettings` has no `idempotencyKey` field. + +**MUST** treat the IK as the **identity of the logical task**. Duplicate submits with the same name + input + owner dedupe per the engine's `executeMode` (`SubmitOptions`). + +**MUST** design `input` so that everything that should distinguish two logical runs is **on the input object**. Example: the analytics plugin includes `queryKey`, statement, parameters, executor key, and format discriminator so distinct client operations do not collide. + +**SHOULD** return the IK to clients when they need reconnect or `/resume` / `/stop` follow-ups. The `executeTask` bridge sets the HTTP header **`TASK_IDEMPOTENCY_HEADER`** (`"X-AppKit-Task-Idempotency-Key"`) and sends an initial SSE frame `event: ready` with `data: {"idempotencyKey":...}` so cross-origin `EventSource` clients that cannot read headers still get the key. Import `TASK_IDEMPOTENCY_HEADER` from `@databricks/appkit` if you read it in middleware or tests. + +**NEVER** assume you can forge ownership by passing a user id from the client. `ExecuteTaskSettings` explicitly forbids a `userId` field (`never`) — identity comes only from `runInUserContext` / `asUser(req)`. + +**NEVER** fold attempt counters, retry numbers, or timestamps into inputs when they would change what should be the **same** logical task — that creates accidental IK churn. Conversely, do not omit fields from input that should separate two runs. + +**SHOULD** switch the storage backend for real multi-pod deployment: if the runtime looks like **Databricks Apps** (`DATABRICKS_APP_NAME`, `DATABRICKS_APP_ID`, or `DATABRICKS_APP_URL`) and you still use the default **SQLite** backend, AppKit logs a **WARN** that tasks will not survive rolling restarts; plan for **`lakebase`** (Postgres) or another shared backend via `createApp({ task: { storage: … } })`. + +--- + +## 4. Handler Signature + +The handler signature is fixed on `TaskDefinition.execute`: + +```typescript +execute(input: TInput, ctx: TypedTaskContext): Promise +``` + +**`ctx` provides** (engine `TaskContext`, narrowed for `emit` when you use `TypedTaskContext`): + +| Field | Type | Use | +|---|---|---| +| `ctx.emit(name, payload)` | `(string, unknown) => Promise` | Append a user event; becomes SSE after bridge rules below | +| `ctx.isRecovery` | `boolean` | `true` if this invocation is a recovery path | +| `ctx.previousEvents` | `TaskEvent[]` | Historical events for this task; inspect in recovery | +| `ctx.context` | `unknown \| null` | Live sidecar (e.g. `UserContext`); **not** persisted across crash | +| `ctx.idempotencyKey` | `string` | IK for this task | +| `ctx.attempt` | `number` | Attempt counter | +| `ctx.taskId`, `ctx.userId`, `ctx.heartbeat()` | … | See engine types in `task.d.ts` | + +**MUST** treat the handler as a function that may run in another process after failure. Durable state must come from `input`, `ctx.previousEvents`, or external storage — not from closures that assume a single long-lived Node process. + +**MUST** branch on `ctx.isRecovery` when semantics differ after restart. + +**MUST** await every `ctx.emit()` call. + +**MUST** when reading `ctx.previousEvents`, compare against engine **`eventType` strings as stored**, not the short names you passed to `emit`. User emits are persisted with a **`custom:`** prefix — e.g. `ctx.emit("tick", …)` produces `eventType: "custom:tick"`. The **`executeTask` SSE bridge strips `custom:` for frames sent to the client**, but **`previousEvents` in the handler still use the prefixed form.** + +**NEVER** emit custom events whose **short name** (after `custom:` is stripped for wire purposes) matches any reserved bridge or terminal name. If you `ctx.emit` one of these, the bridge **logs a warning and drops** the frame so clients do not get misleading terminal or control events: + +- `ready`, `error`, `heartbeat`, `completed`, `failed`, `cancelled`, `suspended` + +Choose alternatives such as `task_completed` or `query_tick`. + +**SHOULD** keep payloads JSON-friendly. The bridge serialises with `JSON.stringify` and a **BigInt** replacer (values become **decimal strings** on the wire) so warehouse `LONG`/`BIGINT` columns do not throw at serialisation time. + +--- + +## 5. The `step()` Helper (Not a Decorator) + +AppKit exports **`step`** as a **higher-order function** from `@databricks/appkit`. There is **no** `@step` decorator on `Plugin`. + +```typescript +import { step } from "@databricks/appkit"; + +const fetchInvoices = step(async (ctx, accountId: string) => { + return this.invoiceClient.list({ accountId }); +}); + +// Inside this.task.task({ execute: async (input, ctx) => { ... } }) +const rows = await fetchInvoices(ctx, input.accountId); +``` + +**Semantics:** + +- **`step(fn)` returns a memoised wrapper.** The binding to the native engine is **lazy**: it resolves on **first invocation** inside a running task, after the task service has initialised. Calling the wrapper **before** `createApp` has booted the task service throws `InitializationError`. +- The engine keys checkpoints using the wrapped function's **`Function.name`**. **Anonymous arrow functions all share an empty name and can collide.** **MUST** use **named function expressions** or **named async functions** for anything you wrap with `step`. +- On recovery, completed steps short-circuit to cached results — use for expensive or non-idempotent segments. + +**MUST** ensure step bodies are safe to skip on replay when already recorded (idempotent reads, or deduped writes). + +**NEVER** call `this.task.start()` from inside a `step` body or another task handler in a way that creates unbounded nested task fan-out. Orchestrate with separate tasks or explicit emits. + +**SHOULD** keep `step` units small and name them after their effect. Engine step events use the `custom:step:…` pattern on the WAL; the **`executeTask` bridge drops `custom:step:*` events** (WAL-only checkpoints, not client SSE). + +--- + +## 6. `executeTask`: The Common Pattern + +`this.executeTask(res, taskName, input, settings?)` bridges **`this.task.start` → SSE subscribe loop** for the POST-and-stream pattern. + +```typescript +injectRoutes(router: IAppRouter) { + this.route(router, { + name: "run", + method: "post", + path: "/run", + handler: async (req, res) => { + await this.executeTask(res, "agent-loop", req.body, { + cancelOnDisconnect: true, + disconnectGraceMs: 5000, + telemetry: { traces: true, metrics: true }, + }); + }, + }); +} +``` + +**`ExecuteTaskSettings`** (only fields that exist): + +- `cancelOnDisconnect?` — default `true`; when `true`, after the client closes the TCP connection the bridge waits **`disconnectGraceMs`** (default **5000** ms) then calls `this.task.stop` with reason `client_disconnected`. Set `cancelOnDisconnect: false` for long OBO runs where you expect `EventSource` reconnects. **Note:** OBO tokens still expire (~1 hour); multi-hour OBO work should use `autoRecover: false` and **`resume` from a fresh auth** before token expiry. +- `disconnectGraceMs?` — non-negative milliseconds; ignored when `cancelOnDisconnect` is `false`. +- `telemetry?` — `{ traces?, metrics? }` (defaults `true`). + +**Compile-time `never` guards** (do not pass): `retry`, `cache`, `timeout`, `stream`, **`userId`**. the task service replaces retry/cache/timeout; the bridge wire is fixed (no `stream.eventFilter`); identity must never be taken from request bodies. + +**Identity / OBO.** The bridge passes `userId` and `context` from **`getCurrentUserContext()`** into `this.task.start`. For OBO, call through the proxy: **`await this.asUser(req).executeTask(res, …)`**. For service principal, call `this.executeTask` without entering user context. + +**Wire behaviour (in order):** + +1. `this.task.start(taskName, input, { userId, context })`. +2. If headers are not yet sent: set **`X-AppKit-Task-Idempotency-Key`**, call `setupSseHeaders(res)`, write **`event: ready`** with the IK in JSON. +3. `this.task.subscribe(idempotencyKey, lastSeq)` where `lastSeq` comes from **`Last-Event-ID`** on the request. +4. For each stream event: **heartbeats** become SSE **comments** (`: hb`), **`custom:step:*`** are skipped, reserved custom names are dropped with a warning, other **`custom:`** events are stripped to the short name for `event:`, **`id:`** is set to `streamSeq` for replay, payloads are JSON with BigInt-safe stringification. +5. On engine terminal types **`completed`**, **`failed`**, **`cancelled`**, the bridge ends the response after forwarding that frame. +6. On AppKit shutdown, active bridges receive **`event: error`** with **`data: {"message":"server_shutting_down"}`** (best effort) before the iterator closes. + +**MUST NOT** pass `retry`, `cache`, `timeout` (interceptor sense), `stream`, or `userId` in settings — they are rejected by types. + +**MUST NOT** call `this.executeTask` outside a request handler without an Express `Response` you own. From `setup`, cron, or workers, use **`this.requireTask().start`** / **`subscribe`** directly (and set `SubmitOptions.context` yourself if OBO). + +**SHOULD** control what the client sees **only** via `ctx.emit` names and payloads (optionally typed with `TaskDefinition` generics). + +--- + +## 7. OBO (`asUser`) with the task service + +**MUST** use `await this.asUser(req).executeTask(...)` when the task body calls plugin code that depends on **`runInUserContext`** (warehouse OBO, etc.). The bridge forwards the live **`UserContext`** as `ctx.context` — it is **never** stored in SQLite; it exists only for the current attempt. + +**NEVER** pass a fake `userId` through settings (the field does not exist) or trust client-supplied identity for resume/stop. + +**MUST** for OBO tasks that must survive process restart **register with `autoRecover: false`** and design **`this.task.resume(idempotencyKey, { userId, context })`** from a **fresh authenticated request** that re-establishes **`UserContext`**: + +```typescript +// setup — OBO-capable task; no automatic recovery without context +this.task.task({ + name: `${this.name}:query`, + execute: (input, ctx) => this._runQuery(input, ctx), + autoRecover: false, +}); + +// Fresh authenticated route: resume must pass the same userId as submit time, and for OBO +// a live context object — mirror Plugin.asUser(req) (ServiceContext + token headers). +// See packages/appkit/src/plugins/analytics/analytics.ts (`_runQueryTask`) for the throw-if-missing contract. +await this.requireTask().resume(ikFromClient, { + userId: this.resolveUserId(req), + // OBO: add `context` — the live `UserContext` for this request (what `executeTask` forwards). +}); +``` + +The **analytics plugin** is the reference: `_runQueryTask` uses **`autoRecover: false`** and **throws** if `input.isAsUser` is true but `ctx.context` is missing (resume/recovery without `context: req` would otherwise risk running as the wrong principal). + +**MUST** pass **`context`** compatible with your handler when resuming OBO work — typically by routing resume through the same `asUser(req)` machinery so `executeTask` or your resume callsite captures `ServiceContext.createUserContext(...)`. + +**SHOULD** scope any **application-level cache** keys with `getCurrentUserId()` for OBO and a stable SP marker for service tasks. + +--- + +## 8. Recovery Patterns (Cookbook) + +Use the **`custom:`** prefix when scanning `ctx.previousEvents`. + +### 8a. Agentic Loop + +Persist conversation state in your DB; use events for streaming only. + +```typescript +this.task.task({ + name: "chat", + execute: async (input, ctx) => { + if (ctx.isRecovery) { + await ctx.emit("recovered", { turns: await this.countDbTurns(input.sessionId) }); + } + // … + }, + autoRecover: true, +}); +``` + +### 8b. Staged Pipeline + +```typescript +async pipeline(input: PipelineInput, ctx: TaskContext) { + const completed = new Set( + ctx.previousEvents + .filter(e => e.eventType === "custom:stage_done") + .map(e => (e.payload as { stage: string }).stage), + ); + if (!completed.has("extract")) { + await this.extract(input); + await ctx.emit("stage_done", { stage: "extract" }); + } + // … +} +``` + +### 8c. Saga (Forward + Compensate) + +```typescript +async saga(input: SagaInput, ctx: TaskContext) { + const completed = ctx.previousEvents + .filter(e => e.eventType === "custom:step_done") + .map(e => e.payload as { name: string; context: unknown }); + const failed = ctx.previousEvents.find(e => e.eventType === "custom:step_failed"); + // … +} +``` + +--- + +## 9. Errors, Suspension, and Shutdown + +**MUST** handle failures from **`this.task.stop`**, **`resume`**, and **`start`** explicitly in production code (`try/catch`, typed error inspection). The vendored `Engine` throws on some invalid transitions; there is no stable public `isTaskPauseError` helper exported from `@databricks/appkit` today — branch on message / name cautiously or map errors in your layer. + +**MUST** rely on AppKit graceful shutdown: **`TaskManager.shutdown()`** drains active SSE bridges ( **`server_shutting_down`** ) then shuts down the native engine. Plugins **MUST NOT** call low-level shutdown from random hooks unless you know you are replacing AppKit lifecycle. + +**SHOULD** call **`this.streamManager.abortAll()`** in **`shutdown()`** if your plugin uses **`executeStream`** alongside the task service SSE — `executeTask` subscriptions are managed by the the task service bridge registry. + +--- + +## 10. NEVER Rules (Compile-Free Footguns) + +**NEVER** call **`this.task.start()`** from inside another task in a way that creates unbounded nested task explosions without a clear orchestration model. + +**NEVER** mutate logical **`input`** fields between attempts for recovery incorrectly — the engine replays stored input. If you need derived state, recompute it from `previousEvents` or durable storage. + +**NEVER** throw inside **`recover`** expecting a silent full retry — treat throws as failed recovery per engine semantics. + +**NEVER** assume **`ctx.context`** is non-null on recovery for **OBO** tasks. Without a fresh **`UserContext`** from **`resume`**, OBO handlers **MUST** fail closed (see analytics plugin). **Recovery that needs OBO `MUST` use `this.task.resume(ik, { userId, context })` from a new authorised HTTP request**, not blind `autoRecover: true`. + +**NEVER** use **`simulateCrash`** outside dev/tests. It requires **`engine.enableTestMode: true`** in config and exists to exercise recovery paths. + +**NEVER** import the vendored **`Task`** singleton or **`Engine`** directly for plugin business logic. Use **`this.task`** (via **`requireTask()`** when you need a non-null reference) so lifecycle, telemetry, and AppKit guards stay consistent. + +**NEVER** emit **reserved** custom event names (section 4) — they are dropped with a warning and never reach the client. + +--- + +## 11. Testing + +**MUST** mock the task service in unit tests (for example `vi.mock("@databricks/appkit", …)` swapping the module surface) or inject a fake `TaskManager` if your harness supports it. + +**SHOULD** when integration-testing, opt into **`enableTestMode: true`** in the task service config only in tests, then exercise **`this.task.simulateCrash(idempotencyKey)`** and recovery. + +**SHOULD** assert: + +- Handlers do not double-commit side effects when re-run under the same IK. +- Recovery skips work based on **`custom:*`** events in `previousEvents`. +- OBO resume without **`context`** fails loudly where your plugin requires it. + +--- + +## 12. Quick Reference + +```typescript +import { + step, + TASK_IDEMPOTENCY_HEADER, + type TaskContext, +} from "@databricks/appkit"; + +// setup (inside Plugin subclass) +async setup() { + this.requireTask().task({ + name: "my-op", + execute: (input, ctx) => this.myOp(input, ctx), + autoRecover: true, + }); +} + +// Route — service principal +this.route(router, { + name: "run-sp", + method: "post", + path: "/run", + handler: async (req, res) => { + await this.executeTask(res, "my-op", req.body, { + cancelOnDisconnect: true, + disconnectGraceMs: 5000, + }); + // res.getHeader(TASK_IDEMPOTENCY_HEADER) when headers are readable (same-origin fetch) + }, +}); + +// Route — OBO (executeTask under asUser proxy) +this.route(router, { + name: "run-obo", + method: "post", + path: "/run-as-user", + handler: async (req, res) => { + await this.asUser(req).executeTask(res, "my-op", req.body, { + cancelOnDisconnect: false, + }); + }, +}); + +// Handler with recovery inspection (engine stores custom:* on TaskEvent.eventType) +async myOp(input: MyInput, ctx: TaskContext): Promise { + const last = ctx.previousEvents.findLast(e => e.eventType === "custom:stage_done"); + void last; + await ctx.emit("stage_done", { stage: 1 }); + return { ok: true }; +} + +// Programmatic consume +const tf = this.requireTask(); +const handle = await tf.start("my-op", input, { userId: "optional-owner-id" }); +for await (const ev of tf.subscribe(handle.idempotencyKey)) { + void ev.event.eventType; +} +``` + +Named step (avoid anonymous `step(async (ctx) => …)` collisions): + +```typescript +const fetchRows = step(async function fetchRows(ctx: TaskContext, id: string) { + return lookup(id); +}); +``` diff --git a/CLAUDE.md b/CLAUDE.md index eaf504796..2457cede9 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -234,6 +234,60 @@ The SDK has built-in SSE support with automatic reconnection: - Plugin name as default tracer/meter scope - Supports traces, metrics, logs (configurable per plugin) +### Durable Task Service + +A built-in **durable execution service** available to every plugin via `this.task`. Use it for any operation that needs to survive process restart: long-running queries, agent loops with expensive LLM calls, multi-step pipelines, sagas, async user-facing operations. + +**Mental model:** a durable function with an event log. Handlers emit structured events during execution, those events are appended to a write-ahead log, and on crash the recovery worker re-spawns the handler with `previousEvents` so it can resume from the last meaningful checkpoint. There is no workflow DSL, no replay engine, no determinism constraint on the handler. + +**Three primitives:** +- **Event log** — handlers emit checkpoints/intermediate results via `ctx.emit(name, payload)`. Each event is sequenced and committed to the WAL. Same log doubles as a real-time SSE stream for clients. +- **Smart recovery** — opt-in per task. If the task crashes, `recover` (or the same handler with `ctx.isRecovery = true`) gets `ctx.previousEvents` and decides what "resume" means. +- **WAL-first durability** — every state transition appends to a sequential WAL before anything else. A background worker batches flushes to SQLite. Sub-millisecond per-event latency, with explicit-flush available when correctness needs strict persistence. + +**Bootstrap:** the task service is initialized by `createApp` automatically. Default storage is SQLite at `.appkit/tasks/tasks.db` and WAL at `.appkit/tasks/wal`. SQLite storage is **ephemeral on Databricks Apps** (multi-pod, no shared volume) — the runtime logs a startup warning and falls back to "tasks die with the pod" semantics. For durability in Apps, configure `task: { storage: { backend: 'lakebase', connectionString: … } }`. Opt out entirely with `createApp({ task: false, plugins: [...] })`; `this.task` is then `null` and plugins must handle that case. + +**Public API on `this.task`:** +```typescript +this.task.task({ name, execute, recover?, autoRecover? }) // register a durable task in setup() +this.task.start(name, input, opts) // spawn an attempt; returns { idempotencyKey } +this.task.subscribe(idempotencyKey, lastSeq?) // async iterable of TaskEvent +this.task.resume(idempotencyKey, opts?) // resume a paused/suspended task +this.task.stop(idempotencyKey, opts?) // request cooperative cancellation +``` + +**The `executeTask` shortcut.** For the common "POST starts a durable task, GET streams its progress over SSE" pattern, use `this.executeTask(res, taskName, input, settings?)` from any route handler. It calls `start`, bridges `subscribe` to the SSE response with `Last-Event-ID` reconnection support, captures the request context for OBO automatically, and dedupes against existing tasks by idempotency key. **`executeTask` deliberately omits the `retry`, `cache`, and `timeout` knobs that `execute()` accepts** — the engine replaces those with stronger primitives (smart recovery, idempotency-key dedup, cooperative `stop`). Compile-time error if you try to pass them. + +**When to use which execution method:** + +| Method | Use for | Durability | Recovery | +|---|---|---|---| +| `this.execute(fn, settings)` | Sub-second to ~5s reads with retry/cache/timeout | None | None (retry from scratch) | +| `this.executeStream(res, gen, settings)` | Live progress for short ops; no crash survival | None | None | +| `this.executeTask(res, name, input)` | Anything that should survive restart, agentic loops, long pipelines | WAL → SQLite | Smart, opt-in via `recover` | + +**Recovery snippet:** +```typescript +this.task.task({ + name: "agent-loop", + autoRecover: true, + execute: async (input: ChatInput, ctx): Promise => { + if (ctx.isRecovery) { + // Find the last successful checkpoint and resume from there. + const lastTurn = ctx.previousEvents.findLast(e => e.eventType === "custom:turn_done"); + input = { ...input, resumeFromTurn: lastTurn?.payload.turn ?? 0 }; + } + for (let turn = input.resumeFromTurn ?? 0; turn < MAX_TURNS; turn++) { + const result = await this.callLlm(ctx, input); + await ctx.emit("turn_done", { turn, result }); + if (result.stop_reason !== "tool_use") return { turns: turn + 1 }; + } + }, +}); +``` + +**OBO (asUser) is automatic** when calling `executeTask` from inside a route handler — the bridge captures the active `UserContext` via the `asUser(req)` proxy and forwards it to the handler as `ctx.context`. Inside the handler, `appkit..asUser(ctx.context)` works exactly like in a normal route. **Never pass `context: req` manually** — `executeTask` does it for you. Combining `autoRecover: true` with OBO is incompatible (the recovery worker has no UserContext after restart) and produces a one-time runtime warning at the first OBO `executeTask` call for a misconfigured task; use `autoRecover: false` + explicit `this.task.resume()` from a fresh authenticated request instead. + ### Analytics Query Pattern The AnalyticsPlugin provides SQL query execution: @@ -402,6 +456,7 @@ This project uses conventional commits (enforced by commitlint): ### Key Dependencies - `@databricks/sdk-experimental` v0.16.0 - Databricks services SDK +- `@databricks/taskflow` (vendored at `packages/appkit/vendor/taskflow/`) - Durable execution engine (Rust + Node.js FFI). Bootstraps automatically with AppKit. Vendored artifacts are pinned in `packages/appkit/vendor/taskflow/VENDOR.json` (sha256 per platform); refresh by re-running the upstream `taskflow` build and copying outputs into the vendor directory, then rerun `pnpm test` with `APPKIT_VERIFY_TASKFLOW_VENDOR=1` to re-derive the digests. - `express` - HTTP server - `zod` - Runtime validation - `OpenTelemetry` - Observability (traces, metrics, logs) @@ -412,9 +467,12 @@ This project uses conventional commits (enforced by commitlint): 3. **Streaming-first** - Built-in SSE support with reconnection 4. **Observability** - OpenTelemetry integration is first-class 5. **Dev Experience** - HMR, hot-reload, source maps, inspection tools +6. **Durable by default for long ops** - Any operation that should survive process restart belongs on the task service (`this.task` / `this.executeTask`). Don't reinvent durability with retry-and-disk. ### Graceful Shutdown The server handles SIGTERM/SIGINT with: -- 15-second timeout -- Aborts in-flight operations -- Closes connections gracefully +- 15-second timeout (force-exit deadline) +- Re-entrancy guard: the second signal during a Ctrl-C race is ignored +- Aborts in-flight operations (SSE streams release HTTP connections first) +- Drains core services in reverse boot order — the task manager walks active `executeTask` bridges and writes a final `event: error / server_shutting_down` frame so clients see a clean signal instead of an EOF +- Closes HTTP listener last diff --git a/packages/appkit/src/tasks/execute-task.ts b/packages/appkit/src/tasks/execute-task.ts index ab32b43b9..8cf7b4d06 100644 --- a/packages/appkit/src/tasks/execute-task.ts +++ b/packages/appkit/src/tasks/execute-task.ts @@ -35,6 +35,19 @@ import type { ActiveBridge } from "./types"; */ export const TASK_IDEMPOTENCY_HEADER = "X-AppKit-Task-Idempotency-Key"; +/** + * Wall-clock idle keep-alive interval, in milliseconds. The engine + * already emits its own `heartbeat` events when the executor is + * actively running, but a task can be quiet for several seconds at a + * time (e.g. waiting on a slow downstream call, or stopped/paused + * while a client is still subscribed). 25 s sits comfortably under + * the typical 60 s idle-socket timeout enforced by AWS ELBs, + * Cloudflare, GCP HTTPS LBs, and most corporate proxies. + * + * @internal + */ +export const IDLE_KEEPALIVE_INTERVAL_MS = 25_000; + const logger = createLogger("tasks:execute"); /** Process-scoped to log the OBO+autoRecover warning once per (plugin, task). @internal */ @@ -95,6 +108,12 @@ export async function executeTask( // Hoisted so the outer `finally` can release the bridge regardless // of which branch unwinds. let unregisterBridge: (() => void) | null = null; + // Wall-clock idle keep-alive timer. Belt-and-braces on top of the + // engine `heartbeat` event stream: if the engine stays silent for + // more than {@link IDLE_KEEPALIVE_INTERVAL_MS} we still write a + // comment frame so an idle load balancer doesn't drop the socket + // mid-task. Cleared from the outer `finally`. + let idleKeepAlive: ReturnType | null = null; try { // `context` is the live executor sidecar — never serialised, never // seen by recovery. Lets the task body re-enter `runInUserContext` @@ -184,6 +203,16 @@ export async function executeTask( }; unregisterBridge = manager._registerBridge(bridge); + // Install the wall-clock keep-alive. `unref()` so this timer + // doesn't keep the process alive on its own — if everything else + // has shut down, the SSE response is already in the process of + // closing and there's nothing to keep alive. + idleKeepAlive = setInterval(() => { + if (res.writableEnded) return; + writeSseComment(res, "hb"); + }, IDLE_KEEPALIVE_INTERVAL_MS); + idleKeepAlive.unref?.(); + for await (const streamEvent of manager.subscribe( idempotencyKey, lastEventId, @@ -299,6 +328,7 @@ export async function executeTask( ); throw err; } finally { + if (idleKeepAlive) clearInterval(idleKeepAlive); unregisterBridge?.(); span?.end(); } diff --git a/packages/appkit/src/tasks/tests/execute-task.test.ts b/packages/appkit/src/tasks/tests/execute-task.test.ts new file mode 100644 index 000000000..50b10671a --- /dev/null +++ b/packages/appkit/src/tasks/tests/execute-task.test.ts @@ -0,0 +1,175 @@ +/** + * Tests for the durable `executeTask` SSE bridge. + * + * Most behaviour of the bridge is covered indirectly by the analytics + * plugin's integration tests (wire-shape, recovery, terminal events). + * The cases below isolate the production-hardening primitives that + * have no other test coverage: + * + * - the wall-clock idle keep-alive that fires when the engine stream + * is genuinely silent for `IDLE_KEEPALIVE_INTERVAL_MS` ms. + */ + +import { afterEach, beforeEach, describe, expect, test, vi } from "vitest"; +import { createStubTaskManager } from "../../../../../tools/test-helpers"; +import { executeTask, IDLE_KEEPALIVE_INTERVAL_MS } from "../execute-task"; +import type { TaskManager } from "../index"; + +type SsePiece = string; + +/** + * Minimal `express.Response` stand-in: captures every chunk written so + * the test can assert SSE framing without spinning up a real server. + */ +function createMockResponse() { + const chunks: SsePiece[] = []; + let writableEnded = false; + let headersSent = false; + const headers: Record = {}; + + const req = { + once: vi.fn(), + header: vi.fn(() => undefined), + }; + + const res = { + req, + get statusCode() { + return 200; + }, + set statusCode(_v: number) {}, + get headersSent() { + return headersSent; + }, + get writableEnded() { + return writableEnded; + }, + setHeader: vi.fn((k: string, v: string) => { + headers[k] = v; + }), + flushHeaders: vi.fn(() => { + headersSent = true; + }), + write: vi.fn((chunk: string) => { + chunks.push(chunk); + return true; + }), + end: vi.fn(() => { + writableEnded = true; + }), + status: vi.fn(function status(this: unknown, _code: number) { + return res; + }), + json: vi.fn(), + } as unknown as import("express").Response; + + return { res, chunks, headers, req }; +} + +/** + * Minimal `ITelemetry` stand-in: returns no tracer, so the bridge skips + * span work entirely. We just need the surface area for `deps`. + */ +function createNoopTelemetry() { + return { + getTracer: () => null, + getMeter: () => null, + getLogger: () => null, + } as unknown as import("../../telemetry").ITelemetry; +} + +describe("executeTask idle keep-alive", () => { + beforeEach(() => { + vi.useFakeTimers(); + }); + + afterEach(() => { + vi.useRealTimers(); + }); + + test("writes an SSE comment after IDLE_KEEPALIVE_INTERVAL_MS while engine is silent", async () => { + const stub = createStubTaskManager(); + const managerAsType = stub as unknown as TaskManager; + const { res, chunks } = createMockResponse(); + + // Handler awaits a deferred we hold open so the engine stream + // stays silent past the keep-alive window. The fake-timer clock + // can advance past the wall-clock interval without `def.execute` + // ever resolving, which is exactly the production-hostile shape + // (long downstream call + idle proxy) the keep-alive exists for. + let releaseHandler!: () => void; + const handlerDone = new Promise((resolve) => { + releaseHandler = resolve; + }); + stub.task({ + name: "silent-task", + execute: async () => { + await handlerDone; + return { ok: true }; + }, + }); + + const bridge = executeTask( + { + manager: managerAsType, + telemetry: createNoopTelemetry(), + pluginName: "test", + }, + res, + "silent-task", + { x: 1 }, + { telemetry: { traces: false } }, + ); + + // Flush microtasks so `manager.start` resolves and the keep-alive + // interval is installed before we advance time. + await vi.advanceTimersByTimeAsync(0); + + // No keep-alive yet: we haven't reached the interval boundary. + expect(chunks.some((c) => c.startsWith(": hb"))).toBe(false); + + // Wall-clock keep-alive should fire once per interval window. + await vi.advanceTimersByTimeAsync(IDLE_KEEPALIVE_INTERVAL_MS); + const afterFirstFire = chunks.filter((c) => c.startsWith(": hb")).length; + expect(afterFirstFire).toBeGreaterThanOrEqual(1); + + await vi.advanceTimersByTimeAsync(IDLE_KEEPALIVE_INTERVAL_MS); + const afterSecondFire = chunks.filter((c) => c.startsWith(": hb")).length; + expect(afterSecondFire).toBeGreaterThan(afterFirstFire); + + // Release the handler so the bridge can finish cleanly and the + // outer `finally` clears the interval. + releaseHandler(); + await bridge; + }); + + test("clears the keep-alive interval when the bridge exits", async () => { + const stub = createStubTaskManager(); + const managerAsType = stub as unknown as TaskManager; + const { res, chunks } = createMockResponse(); + stub.task({ + name: "fast-task", + execute: async () => ({ ok: true }), + }); + + await executeTask( + { + manager: managerAsType, + telemetry: createNoopTelemetry(), + pluginName: "test", + }, + res, + "fast-task", + {}, + { telemetry: { traces: false } }, + ); + + // After the bridge exits, the keep-alive must not continue to + // fire — advancing a full interval should add no new `: hb` + // frames. + const baseline = chunks.filter((c) => c.startsWith(": hb")).length; + await vi.advanceTimersByTimeAsync(IDLE_KEEPALIVE_INTERVAL_MS * 2); + const after = chunks.filter((c) => c.startsWith(": hb")).length; + expect(after).toBe(baseline); + }); +});