From c02bcd9a55d1328b10aefacc427b0cc2724d3e07 Mon Sep 17 00:00:00 2001 From: ditadi Date: Tue, 12 May 2026 13:19:33 +0100 Subject: [PATCH] feat(dev-playground): durable-task example + typed SSE client helper MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The reference implementation for plugin authors. A demo plugin covering both TaskFlow recovery patterns (manual via `ctx.previousEvents` and structural via `step()`), a typed frontend SSE consumer (`subscribeToTaskflowTask`), and the `connectSSE` parser extension that captures `event:` field names. Bottom-up teaching: server plugin → bridge → client helper → React UI. Server demo plugin (`apps/dev-playground/server/durable-task-example-plugin.ts`): - `count-to-n` task — manual recovery via `ctx.previousEvents`. Ticks once per `sleepMs`, emitting typed `tick` events. On recovery, scans the event log to find the last persisted tick and resumes from there. The pattern for "checkpoint is the last time I emitted X" with no expensive computation to memoize. - `pipeline-with-steps` task — automatic recovery via `step()`. Wraps each stage (extract → transform → load) with `step()`, which memoizes its result in the WAL the first time it runs. On recovery, completed stages return the cached value without re-executing. The pattern for stages that are expensive (LLM calls, large queries) and unsafe to replay. - Routes (mounted under `/api/durable-example`): - `POST /run`, `POST /run-pipeline` — start + bridge SSE via `executeTask`. - `POST /crash/:id` — `simulateCrash` (gated behind `NODE_ENV !== "production"`). - `POST /stop/:id` — cooperative `taskflow.stop({ reason })`. - `POST /nudge-recovery` — re-submits the original input so the same IK triggers stale-Running recovery (`engine.resume()` only applies to Suspended tasks, so the demo "nudges" the engine). - `GET /reattach/:id` — bridges an SSE stream onto an existing task by IK via `subscribe()` + `setupSseHeaders` + `writeSseFrame` directly (the `executeTask` path would derive a new IK). Performs an OBO ownership check via `asUser(req).reconnect(id, userId)` before subscribing (F11 fix). - Registers with `enableTestMode: true` so `simulateCrash` is available; the route handler additionally gates on `NODE_ENV` so a misconfigured production deployment can't crash live tasks. `apps/dev-playground/server/index.ts`: - Registers the demo plugin and enables test mode on the TaskFlow config (`taskflow: { engine: { enableTestMode: true } }`) so `simulateCrash` is callable from the demo route. Client React route (`apps/dev-playground/client/src/routes/durable-task.route.tsx`): - Exercises both tasks end-to-end: `POST /run` then opens an SSE stream via `subscribeToTaskflowTask`. Renders `tick` / `recovered` events for `count-to-n`; renders `stage_started` / `stage_done` / `recovered` for `pipeline-with-steps` (which surfaces "from cache" on recovered stages). - Buttons for Stop, Crash, Nudge, and Reattach exercise the full cancellation / crash / recovery / re-attach loop. - Adds nav entries in `__root.tsx`, `index.tsx`, and the TanStack-generated `routeTree.gen.ts`. Typed client helper (`packages/appkit-ui/src/js/sse/subscribe-taskflow-task.ts`): - `subscribeToTaskflowTask(url, { onEvent, onComplete, onError, signal? })` — typed async API consuming the AppKit SSE bridge. Each `event: ` frame is dispatched to `onEvent[name]` with `payload` typed as `TEvents[name]`. - Terminal events (`completed`, `failed`, `cancelled`) resolve / reject the returned promise so plugins can `await` the durable run without an event handler. - `Last-Event-ID` reconnection: the helper tracks the highest seen `id:` frame and reattaches with that header on transient network failure. Tests assert the reconnect math is correct. - Includes tests for happy-path streaming, terminal events, abort via `AbortSignal`, and Last-Event-ID reconnect. `connect-sse` extension (`packages/appkit-ui/src/js/sse/connect-sse.ts`, `types.ts`, `index.ts`): - The generic SSE parser captures `event:` field names alongside `data:` payloads. `SSEMessage` gains `event?: string` so any AppKit SSE consumer can inspect the event name without re-parsing. Tests cover multi-line `data:` joining, CRLF normalisation, and comment-frame handling. - Export the new typed helper from `index.ts`. Gitignore: - `apps/dev-playground/.gitignore` adds `tasks.*` / `*.wal` patterns as a defensive belt-and-braces around the existing `.appkit/` exclusion. The demo plugin may configure storage at the playground root for diagnostics; the additional patterns keep `tasks.db` and the rotating WAL out of git regardless of `databasePath`. Verify: - `pnpm -r typecheck`, `pnpm build`, `pnpm test` (125 files, 2304 tests) all green. - `pnpm exec biome check` clean on touched files. - `pnpm exec knip` clean. Risk. Demo plugin is unauthenticated by design (it ships with the dev playground, not the SDK). `/crash/:id` returns 404 in production via `NODE_ENV` gate; `enableTestMode` flips on `simulateCrash`. The demo route handlers do not enforce auth — they assume the playground sits behind the Databricks Apps proxy. Document in deployment notes. Not in this PR. No production-plugin changes. No doc rewrite — that's PR 7. The `subscribeToTaskflowTask` helper currently requires plugin authors to redeclare `TEvents` client-side; a future follow-up (F26) would derive it from the registered `TaskHandle`. Stacked on: stack/taskflow/analytics-migration. Signed-off-by: ditadi --- apps/dev-playground/.gitignore | 3 + .../client/src/routeTree.gen.ts | 21 + .../client/src/routes/__root.tsx | 8 + .../client/src/routes/durable-task.route.tsx | 679 ++++++++++++++++++ .../client/src/routes/index.tsx | 19 + .../server/durable-task-example-plugin.ts | 525 ++++++++++++++ apps/dev-playground/server/index.ts | 28 + .../appkit-ui/src/js/sse/connect-sse.test.ts | 220 ++++++ packages/appkit-ui/src/js/sse/connect-sse.ts | 13 +- packages/appkit-ui/src/js/sse/index.ts | 1 + .../js/sse/subscribe-taskflow-task.test.ts | 409 +++++++++++ .../src/js/sse/subscribe-taskflow-task.ts | 390 ++++++++++ packages/appkit-ui/src/js/sse/types.ts | 7 + 13 files changed, 2322 insertions(+), 1 deletion(-) create mode 100644 apps/dev-playground/client/src/routes/durable-task.route.tsx create mode 100644 apps/dev-playground/server/durable-task-example-plugin.ts create mode 100644 packages/appkit-ui/src/js/sse/connect-sse.test.ts create mode 100644 packages/appkit-ui/src/js/sse/subscribe-taskflow-task.test.ts create mode 100644 packages/appkit-ui/src/js/sse/subscribe-taskflow-task.ts diff --git a/apps/dev-playground/.gitignore b/apps/dev-playground/.gitignore index d36ad9c86..69d2a02c8 100644 --- a/apps/dev-playground/.gitignore +++ b/apps/dev-playground/.gitignore @@ -5,5 +5,8 @@ playwright-report/ # Auto-generated types (endpoint-specific, varies per developer) shared/appkit-types/serving.d.ts +tasks.* +*.wal + # TaskFlow durable storage (SQLite + WAL); per-machine, never checked in. .appkit/ diff --git a/apps/dev-playground/client/src/routeTree.gen.ts b/apps/dev-playground/client/src/routeTree.gen.ts index 45e280700..0b7507d91 100644 --- a/apps/dev-playground/client/src/routeTree.gen.ts +++ b/apps/dev-playground/client/src/routeTree.gen.ts @@ -20,6 +20,7 @@ import { Route as LakebaseRouteRouteImport } from './routes/lakebase.route' import { Route as JobsRouteRouteImport } from './routes/jobs.route' import { Route as GenieRouteRouteImport } from './routes/genie.route' import { Route as FilesRouteRouteImport } from './routes/files.route' +import { Route as DurableTaskRouteRouteImport } from './routes/durable-task.route' import { Route as DataVisualizationRouteRouteImport } from './routes/data-visualization.route' import { Route as ChartInferenceRouteRouteImport } from './routes/chart-inference.route' import { Route as ArrowAnalyticsRouteRouteImport } from './routes/arrow-analytics.route' @@ -81,6 +82,11 @@ const FilesRouteRoute = FilesRouteRouteImport.update({ path: '/files', getParentRoute: () => rootRouteImport, } as any) +const DurableTaskRouteRoute = DurableTaskRouteRouteImport.update({ + id: '/durable-task', + path: '/durable-task', + getParentRoute: () => rootRouteImport, +} as any) const DataVisualizationRouteRoute = DataVisualizationRouteRouteImport.update({ id: '/data-visualization', path: '/data-visualization', @@ -113,6 +119,7 @@ export interface FileRoutesByFullPath { '/arrow-analytics': typeof ArrowAnalyticsRouteRoute '/chart-inference': typeof ChartInferenceRouteRoute '/data-visualization': typeof DataVisualizationRouteRoute + '/durable-task': typeof DurableTaskRouteRoute '/files': typeof FilesRouteRoute '/genie': typeof GenieRouteRoute '/jobs': typeof JobsRouteRoute @@ -131,6 +138,7 @@ export interface FileRoutesByTo { '/arrow-analytics': typeof ArrowAnalyticsRouteRoute '/chart-inference': typeof ChartInferenceRouteRoute '/data-visualization': typeof DataVisualizationRouteRoute + '/durable-task': typeof DurableTaskRouteRoute '/files': typeof FilesRouteRoute '/genie': typeof GenieRouteRoute '/jobs': typeof JobsRouteRoute @@ -150,6 +158,7 @@ export interface FileRoutesById { '/arrow-analytics': typeof ArrowAnalyticsRouteRoute '/chart-inference': typeof ChartInferenceRouteRoute '/data-visualization': typeof DataVisualizationRouteRoute + '/durable-task': typeof DurableTaskRouteRoute '/files': typeof FilesRouteRoute '/genie': typeof GenieRouteRoute '/jobs': typeof JobsRouteRoute @@ -170,6 +179,7 @@ export interface FileRouteTypes { | '/arrow-analytics' | '/chart-inference' | '/data-visualization' + | '/durable-task' | '/files' | '/genie' | '/jobs' @@ -188,6 +198,7 @@ export interface FileRouteTypes { | '/arrow-analytics' | '/chart-inference' | '/data-visualization' + | '/durable-task' | '/files' | '/genie' | '/jobs' @@ -206,6 +217,7 @@ export interface FileRouteTypes { | '/arrow-analytics' | '/chart-inference' | '/data-visualization' + | '/durable-task' | '/files' | '/genie' | '/jobs' @@ -225,6 +237,7 @@ export interface RootRouteChildren { ArrowAnalyticsRouteRoute: typeof ArrowAnalyticsRouteRoute ChartInferenceRouteRoute: typeof ChartInferenceRouteRoute DataVisualizationRouteRoute: typeof DataVisualizationRouteRoute + DurableTaskRouteRoute: typeof DurableTaskRouteRoute FilesRouteRoute: typeof FilesRouteRoute GenieRouteRoute: typeof GenieRouteRoute JobsRouteRoute: typeof JobsRouteRoute @@ -317,6 +330,13 @@ declare module '@tanstack/react-router' { preLoaderRoute: typeof FilesRouteRouteImport parentRoute: typeof rootRouteImport } + '/durable-task': { + id: '/durable-task' + path: '/durable-task' + fullPath: '/durable-task' + preLoaderRoute: typeof DurableTaskRouteRouteImport + parentRoute: typeof rootRouteImport + } '/data-visualization': { id: '/data-visualization' path: '/data-visualization' @@ -361,6 +381,7 @@ const rootRouteChildren: RootRouteChildren = { ArrowAnalyticsRouteRoute: ArrowAnalyticsRouteRoute, ChartInferenceRouteRoute: ChartInferenceRouteRoute, DataVisualizationRouteRoute: DataVisualizationRouteRoute, + DurableTaskRouteRoute: DurableTaskRouteRoute, FilesRouteRoute: FilesRouteRoute, GenieRouteRoute: GenieRouteRoute, JobsRouteRoute: JobsRouteRoute, diff --git a/apps/dev-playground/client/src/routes/__root.tsx b/apps/dev-playground/client/src/routes/__root.tsx index db42fdafb..92a228e07 100644 --- a/apps/dev-playground/client/src/routes/__root.tsx +++ b/apps/dev-playground/client/src/routes/__root.tsx @@ -64,6 +64,14 @@ function RootComponent() { Reconnect + + + + + + +
+ {/* Inputs */} +
+ {taskKind === "count" && ( + + )} + + +
+ +
+ Idempotency key (engine-derived):{" "} + + {idempotencyKey ?? "(will be issued on first run)"} + +
+ + {/* Actions */} +
+ + + + + + +
+ + {/* Progress */} + {taskKind === "count" ? ( + + ) : ( + + )} +
+ + {/* Event log */} +
+
Event log
+
    + {log.length === 0 && ( +
  • No events yet.
  • + )} + {log.map((entry) => ( +
  • + {new Date(entry.ts).toLocaleTimeString()} — {entry.kind}:{" "} + {entry.message} +
  • + ))} +
+
+ + + ); +} + +// ── per-task progress views ─────────────────────────────────────────── + +function CountProgress({ + value, + total, + percent, + recovered, +}: { + value: number; + total: number; + percent: number; + recovered: boolean; +}) { + return ( +
+
+ + Progress: {value} + {total > 0 ? ` / ${total}` : ""} + {recovered ? " (recovered)" : ""} + + {percent}% +
+
+
+
+
+ ); +} + +function PipelineProgress({ + stages, +}: { + stages: Record< + PipelineStage, + { state: "idle" | "running" | "done"; cached: boolean } + >; +}) { + const order: PipelineStage[] = ["extract", "transform", "load"]; + return ( +
+
Pipeline stages
+
+ {order.map((stage) => { + const s = stages[stage]; + const tone = + s.state === "done" + ? s.cached + ? "bg-amber-100 border-amber-400 text-amber-900" + : "bg-emerald-100 border-emerald-400 text-emerald-900" + : s.state === "running" + ? "bg-blue-100 border-blue-400 text-blue-900 animate-pulse" + : "bg-muted border-muted-foreground/20 text-muted-foreground"; + return ( +
+
{stage}
+
+ {s.state} + {s.state === "done" && s.cached ? " (from cache)" : ""} +
+
+ ); + })} +
+
+ Recovery hint: simulate a crash mid-pipeline, then nudge — completed + stages replay from cache (yellow), the in-flight one re-runs. +
+
+ ); +} + +// ── helpers ─────────────────────────────────────────────────────────── + +function makeRunKey(): string { + return `run-${Math.random().toString(36).slice(2, 10)}`; +} + +function logClass(kind: LogEntry["kind"]): string { + switch (kind) { + case "error": + return "text-destructive"; + case "recovered": + return "text-amber-600"; + case "cached": + return "text-amber-600"; + case "cancelled": + return "text-amber-600"; + case "done": + return "text-emerald-600"; + case "stage": + return "text-blue-600"; + default: + return ""; + } +} diff --git a/apps/dev-playground/client/src/routes/index.tsx b/apps/dev-playground/client/src/routes/index.tsx index ec2d9a50a..c4646959d 100644 --- a/apps/dev-playground/client/src/routes/index.tsx +++ b/apps/dev-playground/client/src/routes/index.tsx @@ -90,6 +90,25 @@ function IndexRoute() {
+ +
+

+ Durable Task +

+

+ Demonstrates `this.executeTask`: a count-to-N task that streams + progress over SSE and survives a simulated process crash thanks + to TaskFlow's smart recovery. +

+ +
+
+

diff --git a/apps/dev-playground/server/durable-task-example-plugin.ts b/apps/dev-playground/server/durable-task-example-plugin.ts new file mode 100644 index 000000000..48d000b5f --- /dev/null +++ b/apps/dev-playground/server/durable-task-example-plugin.ts @@ -0,0 +1,525 @@ +/** + * Durable Task Example — showcases TaskFlow's two recovery patterns. + * + * Two tasks are registered: + * + * `count-to-n` — manual recovery via `ctx.previousEvents`. The handler + * ticks once per `sleepMs`, emitting `tick` events. On recovery, it + * scans the event log to find the last persisted tick and resumes + * from there. Use this pattern when a checkpoint is "the last time + * I emitted X" and there's no expensive computation to memoize. + * + * `pipeline-with-steps` — automatic recovery via `step()`. Each stage + * (extract → transform → load) is wrapped with `step()`, which + * memoizes its result in the WAL the first time it runs. On + * recovery, completed stages return their cached value without + * re-executing. Use this pattern when stages are expensive (LLM + * calls, large queries, network I/O) and replay is unsafe. + * + * Routes (mounted under `/api/durable-example`): + * POST /run — start a `count-to-n` run, bridges SSE. + * POST /run-pipeline — start a `pipeline-with-steps` run, bridges SSE. + * POST /crash/:id — `simulateCrash` for the task with idempotencyKey :id. + * POST /stop/:id — cooperative cancel via `taskflow.stop()`. + * POST /nudge-recovery — re-`start()` with the same body as the + * original run (same input ⇒ same engine IK). + * Triggers stale-Running recovery after a + * `simulateCrash`; unlike `engine.resume()` + * which only applies to **Suspended** tasks. + * GET /reattach/:id — bridge an SSE stream onto an existing run + * by IK (uses `subscribe` directly because + * `executeTask` would derive a new IK from + * this synthetic input). + * + * The frontend route at `/durable-task` exercises all of the above. + */ +import { + Plugin, + type PluginManifest, + setupSseHeaders, + step, + type TaskContext, + type TypedTaskContext, + toPlugin, + writeSseFrame, +} from "@databricks/appkit"; +import type { IAppRouter } from "shared"; + +const TASKS = { + countToN: "count-to-n", + pipeline: "pipeline-with-steps", +} as const; +type TaskName = (typeof TASKS)[keyof typeof TASKS]; + +// Names the TaskFlow SSE bridge writes itself (or the engine emits). +// A user `ctx.emit("completed", …)` would otherwise round-trip on the +// wire as `event: completed` and trigger the EventSource close path. +// Mirror this in any hand-rolled bridge (see `/reattach/:id` below). +const BRIDGE_RESERVED = new Set([ + "ready", + "error", + "heartbeat", + "completed", + "failed", + "cancelled", + "suspended", +]); + +// ── count-to-n ───────────────────────────────────────────────────────── + +interface CountInput { + /** + * Caller-chosen discriminator. Becomes part of the input the engine + * hashes into the idempotency key, so two clicks with the same `runKey` + * dedup to the same task and a different `runKey` produces a fresh run. + */ + runKey: string; + n: number; + sleepMs: number; +} + +interface CountEvents extends Record { + tick: { value: number; total: number }; + recovered: { resumed_from: number }; +} + +// ── pipeline-with-steps ──────────────────────────────────────────────── + +interface PipelineInput { + runKey: string; + /** Per-stage delay (ms). Held above the stale threshold so that + * `simulateCrash` mid-stage exercises the cached-step recovery. */ + stageMs: number; +} + +interface ExtractResult { + sourceId: string; + rows: number; +} +interface TransformResult { + rows: number; + sum: number; +} +interface LoadResult { + rows: number; + sum: number; + destinationId: string; +} + +interface PipelineEvents extends Record { + stage_started: { stage: "extract" | "transform" | "load" }; + stage_completed: { + stage: "extract" | "transform" | "load"; + /** Payload is the cached return value of the step. */ + result: ExtractResult | TransformResult | LoadResult; + /** True when the engine returned the cached result instead of + * re-running the body — observable on recovery. */ + fromCache?: boolean; + }; +} + +// `step()` is a higher-order wrapper: it takes `(ctx, ...args) => +// Promise` and returns a memoized version with the same shape. The +// first invocation runs the body and writes the result into the WAL +// under a step-specific key (the function name); later invocations +// (replay or recovery) short-circuit to the cached value. We use the +// explicit-name overload so the WAL key is stable across minifiers +// and refactors that might rename the surrounding `const`. +const extract = step( + "extract", + async (_ctx: TaskContext, sourceId: string): Promise => { + return { sourceId, rows: 100 }; + }, +); + +const transform = step( + "transform", + async (_ctx: TaskContext, input: ExtractResult): Promise => { + let sum = 0; + for (let i = 0; i < input.rows; i++) sum += i; + return { rows: input.rows, sum }; + }, +); + +const load = step( + "load", + async ( + _ctx: TaskContext, + input: TransformResult, + destinationId: string, + ): Promise => { + return { ...input, destinationId }; + }, +); + +export class DurableTaskExamplePlugin extends Plugin { + static manifest = { + name: "durable-example", + displayName: "Durable Task Example", + description: + "Demonstrates this.executeTask + TaskFlow's two recovery patterns (manual via previousEvents and automatic via step()).", + resources: { required: [], optional: [] }, + } satisfies PluginManifest<"durable-example">; + + async setup(): Promise { + const taskflow = this.requireTaskflow(); + + taskflow.task({ + name: TASKS.countToN, + execute: (input, ctx) => this.countToN(input, ctx), + autoRecover: true, + }); + + taskflow.task({ + name: TASKS.pipeline, + execute: (input, ctx) => this.pipelineWithSteps(input, ctx), + autoRecover: true, + }); + } + + /** + * Count-to-N — manual recovery pattern. + * + * Body emits `tick` once per `sleepMs`. On recovery, scans the event + * log to find the last persisted `tick` and resumes from there. + * Demonstrates `ctx.previousEvents` + `ctx.isRecovery`. + */ + private async countToN( + input: CountInput, + ctx: TypedTaskContext, + ): Promise<{ final: number }> { + let start = 0; + + if (ctx.isRecovery) { + // Walk previousEvents from the tail looking for the last `tick`. + // `Array.prototype.findLast` would be cleaner but is ES2023; the + // root tsconfig targets ES2022 so we do the loop by hand. + let lastTick: (typeof ctx.previousEvents)[number] | undefined; + for (let i = ctx.previousEvents.length - 1; i >= 0; i--) { + const e = ctx.previousEvents[i]; + if (e?.eventType === "custom:tick") { + lastTick = e; + break; + } + } + const lastValue = + lastTick?.payload && typeof lastTick.payload === "object" + ? Number((lastTick.payload as CountEvents["tick"]).value) + : NaN; + if (Number.isFinite(lastValue)) { + start = lastValue + 1; + await ctx.emit("recovered", { resumed_from: start }); + } + } + + for (let value = start; value <= input.n; value++) { + await new Promise((resolve) => + setTimeout(resolve, Math.max(50, input.sleepMs)), + ); + await ctx.emit("tick", { value, total: input.n }); + } + + return { final: input.n }; + } + + /** + * Pipeline-with-steps — automatic recovery pattern. + * + * Three stages chained as `step()`s. After a crash: + * - Stages that already wrote their result to the WAL replay + * instantly with the cached value (no rerun). + * - The first stage that hadn't completed runs from scratch. + * - Subsequent stages run for the first time as usual. + * + * The `stage_completed` event carries `fromCache: true` for replays + * so the frontend can tell apart "ran fresh" from "resumed". + * `ctx.previousEvents` is consulted only for that UI cue — `step()` + * itself does the heavy lifting silently. + */ + private async pipelineWithSteps( + input: PipelineInput, + ctx: TypedTaskContext, + ): Promise { + const stageMs = Math.max(50, input.stageMs); + + const wasCompleted = (stage: PipelineEvents["stage_started"]["stage"]) => + ctx.isRecovery && + ctx.previousEvents.some( + (e) => + e?.eventType === "custom:stage_completed" && + (e?.payload as PipelineEvents["stage_completed"] | undefined) + ?.stage === stage, + ); + + await ctx.emit("stage_started", { stage: "extract" }); + const extractCached = wasCompleted("extract"); + if (!extractCached) await sleep(stageMs); + const extracted = await extract(ctx, input.runKey); + await ctx.emit("stage_completed", { + stage: "extract", + result: extracted, + fromCache: extractCached, + }); + + await ctx.emit("stage_started", { stage: "transform" }); + const transformCached = wasCompleted("transform"); + if (!transformCached) await sleep(stageMs); + const transformed = await transform(ctx, extracted); + await ctx.emit("stage_completed", { + stage: "transform", + result: transformed, + fromCache: transformCached, + }); + + await ctx.emit("stage_started", { stage: "load" }); + const loadCached = wasCompleted("load"); + if (!loadCached) await sleep(stageMs); + const loaded = await load(ctx, transformed, `dest-${input.runKey}`); + await ctx.emit("stage_completed", { + stage: "load", + result: loaded, + fromCache: loadCached, + }); + + return loaded; + } + + injectRoutes(router: IAppRouter): void { + this.route(router, { + name: "run", + method: "post", + path: "/run", + handler: async (req, res) => { + const input = parseCountInput(req.body); + // The engine derives the IK from sha256(taskName, input, userId). + // The client reads it from the `X-Taskflow-Idempotency-Key` + // response header (or the first SSE `ready` event). + await this.executeTask(res, TASKS.countToN, input); + }, + }); + + this.route(router, { + name: "run-pipeline", + method: "post", + path: "/run-pipeline", + handler: async (req, res) => { + const input = parsePipelineInput(req.body); + await this.executeTask(res, TASKS.pipeline, input); + }, + }); + + // `simulateCrash` is a test-mode primitive that aborts the executor + // mid-run. Exposing it on a public route in production would let any + // caller crash any in-flight task by guessing its IK (the IK is + // content-addressed, so colleagues who know the input can derive it). + // The dev-playground sets `enableTestMode: true` to demo recovery, + // which is itself dev-only. Gating the route here means a copy-paste + // of this plugin into a real app does not silently expose the crash + // verb if NODE_ENV is set. + this.route(router, { + name: "crash", + method: "post", + path: "/crash/:id", + handler: async (req, res) => { + if (process.env.NODE_ENV === "production") { + res.status(404).json({ error: "Not found" }); + return; + } + const id = String(req.params.id); + try { + this.requireTaskflow().simulateCrash(id); + res.json({ crashed: true, idempotencyKey: id }); + } catch (err) { + res.status(400).json({ error: errorMessage(err) }); + } + }, + }); + + this.route(router, { + name: "stop", + method: "post", + path: "/stop/:id", + handler: async (req, res) => { + const id = String(req.params.id); + const reason = + typeof req.body?.reason === "string" + ? req.body.reason + : "user_requested"; + try { + await this.requireTaskflow().stop(id, { reason }); + res.json({ stopped: true, idempotencyKey: id, reason }); + } catch (err) { + res.status(400).json({ error: errorMessage(err) }); + } + }, + }); + + this.route(router, { + name: "nudge-recovery", + method: "post", + path: "/nudge-recovery", + handler: async (req, res) => { + // Re-`start()` with the **same** input as the original run so the + // engine derives the same IK and re-spawns the existing + // (stale-Running) task. Without `userId` here for the same reason: + // `/run` and `/run-pipeline` go through `executeTask`, which + // resolves userId from the active `runInUserContext` scope (none + // here ⇒ `undefined`). Passing a userId derived from + // `x-forwarded-user` would produce a *different* IK and create a + // fresh task instead of nudging the existing one. + const taskName = parseTaskName(req.body?.taskName) ?? TASKS.countToN; + const input = + taskName === TASKS.pipeline + ? parsePipelineInput(req.body) + : parseCountInput(req.body); + try { + const handle = await this.requireTaskflow().start(taskName, input); + res.json({ + ok: true, + taskName, + idempotencyKey: handle.idempotencyKey, + }); + } catch (err) { + res.status(400).json({ error: errorMessage(err) }); + } + }, + }); + + this.route(router, { + name: "reattach", + method: "get", + path: "/reattach/:id", + handler: async (req, res) => { + // Bridge an SSE stream onto an existing run by IK. We don't use + // `executeTask` because the engine would derive a *new* IK from + // this (synthetic) input — instead we go straight to + // `subscribe` and bridge events ourselves, reusing + // `setupSseHeaders` / `writeSseFrame` so the wire format + // (headers + framing) stays identical to `executeTask`. + const id = String(req.params.id); + const taskflow = this.requireTaskflow(); + const lastSeq = parseLastEventId(req.header("last-event-id")); + + // Ownership / existence check: `taskflow.reconnect(ik, userId)` + // returns null when the IK is unknown or the userId does not + // match the task's recorded owner. Without this check, ANY + // caller who can guess an IK (the IK is content-addressed — + // colleagues can derive it from a known input shape) could + // open this stream and read the task's events. The demo + // intentionally has no auth in front of it, so we guard at the + // route level and 404 ambiguously rather than 403 to avoid + // confirming whether a given IK exists. + const userId = req.header("x-forwarded-user"); + const owner = await taskflow.reconnect(id, userId); + if (!owner) { + res.status(404).json({ error: "Not found" }); + return; + } + + setupSseHeaders(res); + writeSseFrame(res, { + event: "ready", + data: JSON.stringify({ idempotencyKey: id }), + }); + + let closed = false; + req.on("close", () => { + closed = true; + }); + + try { + for await (const evt of taskflow.subscribe(id, lastSeq)) { + if (closed || res.writableEnded) break; + const type = evt.event.eventType; + if (type === "heartbeat") continue; + if (typeof type === "string" && type.startsWith("custom:step:")) { + continue; + } + const isCustom = + typeof type === "string" && type.startsWith("custom:"); + const eventName = isCustom + ? type.slice("custom:".length) + : (type ?? "message"); + // Mirrors the `executeTask` bridge: refuse to forward a + // user-emitted event whose name collides with bridge wire + // vocabulary (would close the EventSource on the client + // while the engine keeps publishing). Inlined here because + // we are demonstrating a hand-rolled bridge. + if (isCustom && BRIDGE_RESERVED.has(eventName)) { + continue; + } + writeSseFrame(res, { + id: evt.streamSeq, + event: eventName, + data: JSON.stringify(evt.event.payload ?? {}), + }); + if ( + type === "completed" || + type === "failed" || + type === "cancelled" + ) { + break; + } + } + } finally { + if (!res.writableEnded) res.end(); + } + }, + }); + } +} + +export const durableTaskExample = toPlugin(DurableTaskExamplePlugin); + +// ── helpers ──────────────────────────────────────────────────────────── + +function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +function errorMessage(err: unknown): string { + return (err as { message?: string } | undefined)?.message ?? String(err); +} + +// We deliberately route through `Record` instead of +// `Partial`: the upstream `body: unknown` carries no +// guarantee that fields exist or have the expected types. `Partial` +// would lie to TypeScript that `b.n` is `number | undefined` when the +// runtime might hand us `"10"` or `[]`. Reading via the loose record +// keeps every access honestly typed as `unknown` and forces the +// per-field validation the callers below already do. +function asObject(body: unknown): Record { + return body !== null && typeof body === "object" + ? (body as Record) + : {}; +} + +function parseCountInput(body: unknown): CountInput { + const b = asObject(body); + const runKey = typeof b.runKey === "string" && b.runKey ? b.runKey : null; + return { + runKey: runKey ?? `run-${Date.now()}`, + n: Number.isFinite(b.n) ? Number(b.n) : 10, + sleepMs: Number.isFinite(b.sleepMs) ? Number(b.sleepMs) : 1000, + }; +} + +function parsePipelineInput(body: unknown): PipelineInput { + const b = asObject(body); + const runKey = typeof b.runKey === "string" && b.runKey ? b.runKey : null; + return { + runKey: runKey ?? `pipeline-${Date.now()}`, + stageMs: Number.isFinite(b.stageMs) ? Number(b.stageMs) : 2500, + }; +} + +function parseTaskName(value: unknown): TaskName | undefined { + if (value === TASKS.countToN || value === TASKS.pipeline) return value; + return undefined; +} + +function parseLastEventId(raw: string | undefined): number | undefined { + if (!raw) return undefined; + const parsed = parseInt(String(raw), 10); + return Number.isFinite(parsed) ? parsed : undefined; +} diff --git a/apps/dev-playground/server/index.ts b/apps/dev-playground/server/index.ts index 91179dacd..6fbf0581d 100644 --- a/apps/dev-playground/server/index.ts +++ b/apps/dev-playground/server/index.ts @@ -14,6 +14,7 @@ import { import { WorkspaceClient } from "@databricks/sdk-experimental"; // TODO: re-enable once vector-search is exported from @databricks/appkit // import { vectorSearch } from "@databricks/appkit"; +import { durableTaskExample } from "./durable-task-example-plugin"; import { lakebaseExamples } from "./lakebase-examples-plugin"; import { reconnect } from "./reconnect-plugin"; import { telemetryExamples } from "./telemetry-example-plugin"; @@ -50,9 +51,36 @@ const adminOnly: FilePolicy = (action, _resource, user) => { }; createApp({ + // TaskFlow is enabled by default with SQLite at .appkit/taskflow/tasks.db. + // We keep it on here so the durable-task-example plugin works out of the + // box. Pass `taskflow: false` to opt out. + // + // Tight timings for the durable-task demo: after `simulateCrash`, recovery + // only runs once the row is "stale" (no heartbeat for `staleThresholdMs`). + // Production defaults (~30s) make the crash demo feel broken; here we keep + // the threshold just above the heartbeat period so nudge + worker recover + // within a few seconds. + // + // Engine rule: heartbeat_interval_ms must be < stale_threshold_ms / 3 + // (avoids marking healthy tasks stale between heartbeats). + // + // `enableTestMode` opts into `simulateCrash` — required by the + // durable-task demo to exercise the recovery path. Production apps + // must leave this off (AppKit defaults it to `false`). + taskflow: { + engine: { + staleThresholdMs: 5000, + recoveryIntervalMs: 1000, + enableTestMode: true, + }, + executor: { + heartbeatIntervalMs: 1500, + }, + }, plugins: [ server(), reconnect(), + durableTaskExample(), telemetryExamples(), analytics({}), genie({ diff --git a/packages/appkit-ui/src/js/sse/connect-sse.test.ts b/packages/appkit-ui/src/js/sse/connect-sse.test.ts new file mode 100644 index 000000000..07e7949a3 --- /dev/null +++ b/packages/appkit-ui/src/js/sse/connect-sse.test.ts @@ -0,0 +1,220 @@ +import { afterEach, describe, expect, test, vi } from "vitest"; +import { connectSSE } from "./connect-sse"; +import type { SSEMessage } from "./types"; + +afterEach(() => { + vi.unstubAllGlobals(); +}); + +describe("connectSSE parser", () => { + test("captures id, event, and data fields from each frame", async () => { + vi.stubGlobal("fetch", () => + Promise.resolve( + sseResponse([ + 'id: 1\nevent: tick\ndata: {"value":1}\n\n', + 'id: 2\nevent: tick\ndata: {"value":2}\n\n', + ]), + ), + ); + + const seen: SSEMessage[] = []; + await connectSSE({ + url: "http://example.test/stream", + onMessage: async (m) => { + seen.push(m); + }, + maxRetries: 0, + }); + + expect(seen).toEqual([ + { id: "1", event: "tick", data: '{"value":1}' }, + { id: "2", event: "tick", data: '{"value":2}' }, + ]); + }); + + test("ignores comment lines (`: hb`) without surfacing a message", async () => { + vi.stubGlobal("fetch", () => + Promise.resolve( + sseResponse([": heartbeat\n\n", 'event: tick\ndata: {"v":1}\n\n']), + ), + ); + + const seen: SSEMessage[] = []; + await connectSSE({ + url: "http://example.test/stream", + onMessage: async (m) => { + seen.push(m); + }, + maxRetries: 0, + }); + + expect(seen).toHaveLength(1); + expect(seen[0]?.event).toBe("tick"); + }); + + test("preserves multi-line `data:` payloads joined by newline", async () => { + vi.stubGlobal("fetch", () => + Promise.resolve( + sseResponse(["event: log\ndata: line one\ndata: line two\n\n"]), + ), + ); + + const seen: SSEMessage[] = []; + await connectSSE({ + url: "http://example.test/stream", + onMessage: async (m) => { + seen.push(m); + }, + maxRetries: 0, + }); + + expect(seen[0]?.data).toBe("line one\nline two"); + }); + + test("emits empty `event` field when frame has no event line", async () => { + vi.stubGlobal("fetch", () => + Promise.resolve(sseResponse(['id: 7\ndata: {"x":1}\n\n'])), + ); + + const seen: SSEMessage[] = []; + await connectSSE({ + url: "http://example.test/stream", + onMessage: async (m) => { + seen.push(m); + }, + maxRetries: 0, + }); + + expect(seen[0]).toEqual({ id: "7", event: "", data: '{"x":1}' }); + }); + + test("reconnects on 502 by default and surfaces Last-Event-ID from latest frame", async () => { + const fetchSpy = vi + .fn() + .mockResolvedValueOnce(new Response(null, { status: 502 })) + .mockResolvedValueOnce(sseResponse(["id: 5\nevent: tick\ndata: ok\n\n"])); + vi.stubGlobal("fetch", fetchSpy); + + const seen: SSEMessage[] = []; + await connectSSE({ + url: "http://example.test/stream", + lastEventId: "5", + onMessage: async (m) => { + seen.push(m); + }, + maxRetries: 1, + // Production rejects retryDelay <= 0; use minimal delay for a fast retry. + retryDelay: 1, + }); + + expect(fetchSpy).toHaveBeenCalledTimes(2); + const retryInit = fetchSpy.mock.calls[1]?.[1] as RequestInit; + expect(new Headers(retryInit?.headers).get("last-event-id")).toBe("5"); + expect(seen).toEqual([{ id: "5", event: "tick", data: "ok" }]); + }); + + test("abort signal cancels mid-stream and onMessage stops being called", async () => { + const controller = new AbortController(); + vi.stubGlobal("fetch", (_url: string, init: RequestInit | undefined) => { + const stream = new ReadableStream({ + start(c) { + const enc = new TextEncoder(); + c.enqueue(enc.encode("event: first\ndata: one\n\n")); + init?.signal?.addEventListener( + "abort", + () => { + c.error(new DOMException("Aborted", "AbortError")); + }, + { once: true }, + ); + }, + }); + return Promise.resolve( + new Response(stream, { + status: 200, + headers: { "Content-Type": "text/event-stream" }, + }), + ); + }); + + const seen: SSEMessage[] = []; + const promise = connectSSE({ + url: "http://example.test/stream", + signal: controller.signal, + onMessage: async (m) => { + seen.push(m); + if (seen.length === 1) controller.abort(); + }, + maxRetries: 0, + }); + + await expect(promise).resolves.toBeUndefined(); + expect(seen).toHaveLength(1); + expect(seen[0]).toEqual({ id: "", event: "first", data: "one" }); + }); + + test("buffers a partial frame across chunk boundaries", async () => { + vi.stubGlobal("fetch", () => + Promise.resolve(sseResponse(['event: tick\ndata: {"x":', "1}\n\n"])), + ); + + const seen: SSEMessage[] = []; + await connectSSE({ + url: "http://example.test/stream", + onMessage: async (m) => { + seen.push(m); + }, + maxRetries: 0, + }); + + expect(seen).toEqual([{ id: "", event: "tick", data: '{"x":1}' }]); + }); + + test("treats CRLF line endings the same as LF", async () => { + vi.stubGlobal("fetch", () => + Promise.resolve(sseResponse(["event: tick\r\ndata: payload\r\n\r\n"])), + ); + + const seen: SSEMessage[] = []; + await connectSSE({ + url: "http://example.test/stream", + onMessage: async (m) => { + seen.push(m); + }, + maxRetries: 0, + }); + + expect(seen).toEqual([{ id: "", event: "tick", data: "payload" }]); + }); + + test("stops after maxRetries on persistent failure", async () => { + const fetchSpy = vi + .fn() + .mockRejectedValue(new Error("network")); + vi.stubGlobal("fetch", fetchSpy); + + const promise = connectSSE({ + url: "http://example.test/stream", + onMessage: async () => {}, + maxRetries: 2, + retryDelay: 1, + }); + + await expect(promise).resolves.toBeUndefined(); + expect(fetchSpy).toHaveBeenCalledTimes(3); + }); +}); + +function sseResponse(chunks: string[]): Response { + const stream = new ReadableStream({ + start(controller) { + const enc = new TextEncoder(); + for (const c of chunks) controller.enqueue(enc.encode(c)); + controller.close(); + }, + }); + return new Response(stream, { + status: 200, + headers: { "Content-Type": "text/event-stream" }, + }); +} diff --git a/packages/appkit-ui/src/js/sse/connect-sse.ts b/packages/appkit-ui/src/js/sse/connect-sse.ts index c4fd4500d..d097ea2f6 100644 --- a/packages/appkit-ui/src/js/sse/connect-sse.ts +++ b/packages/appkit-ui/src/js/sse/connect-sse.ts @@ -109,6 +109,7 @@ export async function connectSSE( onMessage({ id: lastEventId ?? "", data: message.data, + event: message.event, }); } } @@ -138,7 +139,13 @@ export async function connectSSE( } /** - * Parses a raw SSE event chunk into a message + * Parses a raw SSE event chunk into a message. + * + * Recognised fields per the SSE spec: `id:`, `event:`, `data:`. Lines + * starting with `:` are comments (e.g. wire-level keep-alives) and + * carry no event payload — chunks containing only comments yield + * `null`. + * * @param chunk - Raw SSE event block * @returns Parsed message or null when no data lines are present * @private @@ -148,6 +155,7 @@ function parseSSEEvent(chunk: string): SSEMessage | null { const lines = normalized.split("\n"); let id: string | undefined; + let event: string | undefined; const dataLines: string[] = []; for (const rawLine of lines) { @@ -158,6 +166,8 @@ function parseSSEEvent(chunk: string): SSEMessage | null { if (line.startsWith("id:")) { id = line.slice(3).trimStart(); + } else if (line.startsWith("event:")) { + event = line.slice(6).trimStart(); } else if (line.startsWith("data:")) { dataLines.push(line.slice(5).trimStart()); } @@ -167,6 +177,7 @@ function parseSSEEvent(chunk: string): SSEMessage | null { return { id: id ?? "", + event: event ?? "", data: dataLines.join("\n"), }; } diff --git a/packages/appkit-ui/src/js/sse/index.ts b/packages/appkit-ui/src/js/sse/index.ts index 94c683136..5daa5cf63 100644 --- a/packages/appkit-ui/src/js/sse/index.ts +++ b/packages/appkit-ui/src/js/sse/index.ts @@ -1,2 +1,3 @@ export * from "./connect-sse"; +export * from "./subscribe-taskflow-task"; export * from "./types"; diff --git a/packages/appkit-ui/src/js/sse/subscribe-taskflow-task.test.ts b/packages/appkit-ui/src/js/sse/subscribe-taskflow-task.test.ts new file mode 100644 index 000000000..c4b38b146 --- /dev/null +++ b/packages/appkit-ui/src/js/sse/subscribe-taskflow-task.test.ts @@ -0,0 +1,409 @@ +import { afterEach, describe, expect, test, vi } from "vitest"; +import { + subscribeToTaskflowTask, + TASKFLOW_IK_HEADER, +} from "./subscribe-taskflow-task"; + +afterEach(() => { + vi.unstubAllGlobals(); +}); + +interface CountEvents { + tick: { value: number; total: number }; + recovered: { resumed_from: number }; +} + +describe("subscribeToTaskflowTask", () => { + test("dispatches per-event handlers and surfaces idempotency key from header", async () => { + vi.stubGlobal("fetch", () => + Promise.resolve( + sseResponse({ + chunks: [ + 'event: ready\ndata: {"idempotencyKey":"ik-from-event"}\n\n', + 'id: 1\nevent: tick\ndata: {"value":1,"total":3}\n\n', + 'id: 2\nevent: tick\ndata: {"value":2,"total":3}\n\n', + 'id: 3\nevent: completed\ndata: {"final":3}\n\n', + ], + headers: { [TASKFLOW_IK_HEADER]: "ik-from-header" }, + }), + ), + ); + + const ticks: CountEvents["tick"][] = []; + let readyIK: string | null = null; + let completed: { final: number } | null = null; + + const result = await subscribeToTaskflowTask< + CountEvents, + { final: number } + >({ + url: "/run", + payload: { n: 3 }, + onReady: ({ idempotencyKey }) => { + readyIK = idempotencyKey; + }, + onEvent: { + tick: (p) => { + ticks.push(p); + }, + }, + onCompleted: (r) => { + completed = r; + }, + }); + + expect(readyIK).toBe("ik-from-header"); + expect(result.idempotencyKey).toBe("ik-from-header"); + expect(ticks).toEqual([ + { value: 1, total: 3 }, + { value: 2, total: 3 }, + ]); + expect(completed).toEqual({ final: 3 }); + }); + + test("falls back to `ready` event payload when no IK header is set", async () => { + vi.stubGlobal("fetch", () => + Promise.resolve( + sseResponse({ + chunks: [ + 'event: ready\ndata: {"idempotencyKey":"ik-from-event"}\n\n', + "event: completed\ndata: null\n\n", + ], + }), + ), + ); + + let readyIK: string | null = null; + const result = await subscribeToTaskflowTask({ + url: "/run", + onReady: ({ idempotencyKey }) => { + readyIK = idempotencyKey; + }, + }); + + expect(readyIK).toBe("ik-from-event"); + expect(result.idempotencyKey).toBe("ik-from-event"); + }); + + test("calls onFailed and stops on a `failed` terminal event", async () => { + vi.stubGlobal("fetch", () => + Promise.resolve( + sseResponse({ + chunks: [ + 'event: ready\ndata: {"idempotencyKey":"x"}\n\n', + 'event: failed\ndata: {"message":"boom"}\n\n', + // Anything after a terminal event must be ignored. + 'event: tick\ndata: {"value":99,"total":1}\n\n', + ], + }), + ), + ); + + const ticks: number[] = []; + let failedMessage: string | null = null; + await subscribeToTaskflowTask({ + url: "/run", + onEvent: { + tick: (p) => { + ticks.push(p.value); + }, + }, + onFailed: (m) => { + failedMessage = m; + }, + }); + + expect(failedMessage).toBe("boom"); + expect(ticks).toEqual([]); + }); + + test("calls onCancelled on a `cancelled` terminal event", async () => { + vi.stubGlobal("fetch", () => + Promise.resolve( + sseResponse({ + chunks: [ + 'event: ready\ndata: {"idempotencyKey":"x"}\n\n', + 'event: cancelled\ndata: {"reason":"user_requested"}\n\n', + ], + }), + ), + ); + + let cancelled = false; + await subscribeToTaskflowTask({ + url: "/run", + onCancelled: () => { + cancelled = true; + }, + }); + + expect(cancelled).toBe(true); + }); + + test("silently drops events the consumer didn't subscribe to", async () => { + vi.stubGlobal("fetch", () => + Promise.resolve( + sseResponse({ + chunks: [ + 'event: ready\ndata: {"idempotencyKey":"x"}\n\n', + 'event: tick\ndata: {"value":1,"total":1}\n\n', + 'event: recovered\ndata: {"resumed_from":0}\n\n', + "event: completed\ndata: null\n\n", + ], + }), + ), + ); + + const ticks: number[] = []; + await subscribeToTaskflowTask({ + url: "/run", + onEvent: { + tick: (p) => { + ticks.push(p.value); + }, + }, + }); + + expect(ticks).toEqual([1]); + }); + + test("ignores comment-only frames (heartbeats)", async () => { + vi.stubGlobal("fetch", () => + Promise.resolve( + sseResponse({ + chunks: [ + ": heartbeat\n\n", + 'event: ready\ndata: {"idempotencyKey":"x"}\n\n', + ": heartbeat\n\n", + 'event: tick\ndata: {"value":1,"total":1}\n\n', + "event: completed\ndata: null\n\n", + ], + }), + ), + ); + + const ticks: number[] = []; + await subscribeToTaskflowTask({ + url: "/run", + onEvent: { + tick: (p) => { + ticks.push(p.value); + }, + }, + }); + + expect(ticks).toEqual([1]); + }); + + test("does not call onError when the abort signal fires", async () => { + const controller = new AbortController(); + vi.stubGlobal("fetch", (_url: string, init: RequestInit | undefined) => { + const stream = new ReadableStream({ + start(c) { + const enc = new TextEncoder(); + c.enqueue( + enc.encode('event: ready\ndata: {"idempotencyKey":"x"}\n\n'), + ); + // Never close; the consumer aborts. + init?.signal?.addEventListener("abort", () => { + c.error(new DOMException("aborted", "AbortError")); + }); + }, + }); + return Promise.resolve( + new Response(stream, { + status: 200, + headers: { "Content-Type": "text/event-stream" }, + }), + ); + }); + + const errors: unknown[] = []; + const promise = subscribeToTaskflowTask({ + url: "/run", + signal: controller.signal, + onError: (e) => { + errors.push(e); + }, + }); + + // Give the stream loop a tick to enter `reader.read()`, then abort. + await Promise.resolve(); + controller.abort(); + + const result = await promise; + expect(errors).toEqual([]); + expect(result.idempotencyKey).toBe("x"); + }); + + test("calls onError on non-2xx HTTP status", async () => { + vi.stubGlobal("fetch", () => + Promise.resolve(new Response("nope", { status: 500 })), + ); + + const errors: unknown[] = []; + await subscribeToTaskflowTask({ + url: "/run", + onError: (e) => { + errors.push(e); + }, + }); + + expect(errors).toHaveLength(1); + expect(String(errors[0])).toContain("HTTP 500"); + }); + + test("issues GET when no payload is provided (reattach pattern)", async () => { + const fetchSpy = vi.fn(() => + Promise.resolve( + sseResponse({ + chunks: ["event: completed\ndata: null\n\n"], + }), + ), + ); + vi.stubGlobal("fetch", fetchSpy); + + await subscribeToTaskflowTask({ url: "/reattach/abc" }); + + expect(fetchSpy).toHaveBeenCalledOnce(); + const init = fetchSpy.mock.calls[0]?.[1]; + expect(init?.method).toBe("GET"); + expect(init?.body).toBeUndefined(); + }); + + test("forwards `lastEventId` as Last-Event-ID header", async () => { + const fetchSpy = vi.fn(() => + Promise.resolve( + sseResponse({ chunks: ["event: completed\ndata: null\n\n"] }), + ), + ); + vi.stubGlobal("fetch", fetchSpy); + + await subscribeToTaskflowTask({ + url: "/reattach/abc", + lastEventId: "42", + }); + + const init = fetchSpy.mock.calls[0]?.[1]; + const headers = init?.headers as Record; + expect(headers["Last-Event-ID"]).toBe("42"); + }); + + test('treats engine "completed" frame as terminal and resolves with payload', async () => { + vi.stubGlobal("fetch", () => + Promise.resolve( + sseResponse({ + chunks: [ + 'event: ready\ndata: {"idempotencyKey":"ik"}\n\n', + 'event: completed\ndata: {"result":42}\n\n', + ], + }), + ), + ); + + let result: { result: number } | null = null; + const promise = subscribeToTaskflowTask({ + url: "/run", + onCompleted: (r) => { + result = r; + }, + }); + + await expect(promise).resolves.toMatchObject({ idempotencyKey: "ik" }); + expect(result).toEqual({ result: 42 }); + }); + + test('treats engine "failed" frame as terminal and rejects with the error', async () => { + vi.stubGlobal("fetch", () => + Promise.resolve( + sseResponse({ + chunks: [ + 'event: ready\ndata: {"idempotencyKey":"ik"}\n\n', + 'event: failed\ndata: {"message":"boom"}\n\n', + ], + }), + ), + ); + + let failedMessage: string | null = null; + const promise = subscribeToTaskflowTask({ + url: "/run", + onFailed: (m) => { + failedMessage = m; + }, + }); + + await expect(promise).resolves.toMatchObject({ idempotencyKey: "ik" }); + expect(failedMessage).toBe("boom"); + }); + + test('forwards "cancelled" terminal correctly', async () => { + vi.stubGlobal("fetch", () => + Promise.resolve( + sseResponse({ + chunks: [ + 'event: ready\ndata: {"idempotencyKey":"ik"}\n\n', + "event: cancelled\ndata: {}\n\n", + ], + }), + ), + ); + + let cancelled = false; + const promise = subscribeToTaskflowTask({ + url: "/run", + onCancelled: () => { + cancelled = true; + }, + }); + + await expect(promise).resolves.toMatchObject({ idempotencyKey: "ik" }); + expect(cancelled).toBe(true); + }); + + test("drops `event: heartbeat` frames silently", async () => { + vi.stubGlobal("fetch", () => + Promise.resolve( + sseResponse({ + chunks: [ + 'event: ready\ndata: {"idempotencyKey":"x"}\n\n', + "event: heartbeat\ndata: {}\n\n", + 'event: tick\ndata: {"value":7,"total":10}\n\n', + "event: completed\ndata: null\n\n", + ], + }), + ), + ); + + const custom: number[] = []; + await subscribeToTaskflowTask({ + url: "/run", + onEvent: { + tick: (p) => { + custom.push(p.value); + }, + }, + }); + + expect(custom).toEqual([7]); + }); +}); + +interface SseResponseInit { + chunks: string[]; + headers?: Record; +} + +function sseResponse({ chunks, headers }: SseResponseInit): Response { + const stream = new ReadableStream({ + start(controller) { + const enc = new TextEncoder(); + for (const c of chunks) controller.enqueue(enc.encode(c)); + controller.close(); + }, + }); + return new Response(stream, { + status: 200, + headers: { "Content-Type": "text/event-stream", ...headers }, + }); +} diff --git a/packages/appkit-ui/src/js/sse/subscribe-taskflow-task.ts b/packages/appkit-ui/src/js/sse/subscribe-taskflow-task.ts new file mode 100644 index 000000000..5597acda9 --- /dev/null +++ b/packages/appkit-ui/src/js/sse/subscribe-taskflow-task.ts @@ -0,0 +1,390 @@ +/** + * `subscribeToTaskflowTask` — typed client for the AppKit TaskFlow SSE + * bridge. + * + * The bridge has a fixed wire shape: `event: ` / + * `data: ` / `id: `. Each frame the + * task body emits via `ctx.emit(name, payload)` becomes one SSE event + * with that name. Three event names are reserved by the bridge: + * + * - `ready` → bridge handshake. Emits `{ idempotencyKey }`. Always + * first. + * - `completed` → terminal success. Emits the task's return value. + * Stream closes after. + * - `failed` → terminal failure. Emits `{ message }`. Stream closes. + * - `cancelled` → terminal cancellation (cooperative `stop()` or + * client disconnect). Stream closes. + * + * Heartbeats are SSE comments (`: hb\n\n`) and never surface as events. + * + * Usage: + * + * ```ts + * type CountEvents = { + * tick: { value: number; total: number }; + * recovered: { resumed_from: number }; + * }; + * + * await subscribeToTaskflowTask({ + * url: "/api/durable-example/run", + * payload: { runKey, n, sleepMs }, + * signal: controller.signal, + * onReady: ({ idempotencyKey }) => setIK(idempotencyKey), + * onEvent: { + * tick: ({ value, total }) => setProgress({ value, total }), + * recovered: ({ resumed_from }) => log(`resumed at ${resumed_from}`), + * }, + * onCompleted: (result) => log("done", result), + * onFailed: (msg) => log("failed", msg), + * }); + * ``` + */ + +import type { SSEMessage } from "./types"; + +/** + * HTTP response header carrying the engine-derived idempotency key. + * + * **Duplicated on purpose** with the server-side definition in + * `@databricks/appkit` (`packages/appkit/src/taskflow/execute-task.ts`). + * `appkit-ui` is a separate npm package (browser/runtime) and cannot + * pull a constant from `appkit` (Node-only, would drag the entire + * server bundle into the browser). The two MUST stay in lockstep — + * change one, change the other. There is a contract test in + * `subscribe-taskflow-task.test.ts` that asserts the literal string. + */ +export const TASKFLOW_IK_HEADER = "X-Taskflow-Idempotency-Key"; + +/** + * Per-event handler map. Each key is an event name from `TEvents`; the + * value is a callback receiving the decoded payload. Handlers may + * return `void` or a `Promise` — the subscriber awaits each one + * sequentially so the UI stays in sync with the wire order. + * + * **Constraint asymmetry note (deliberate):** the server-side `TEvents` + * generic on `TaskDefinition` extends `Record` (the + * engine writes the payload into the WAL as JSON; an indexable object + * is the cheapest way to type that). The client side here has no + * `extends` constraint so callers can pass an `interface CountEvents + * { tick: ...; recovered: ...; }` directly — interfaces don't have + * implicit string index signatures, so requiring `extends Record` + * would force `type` aliases everywhere. Both sides interoperate + * through the wire payload, which is structurally compatible with + * either constraint. + */ +export type TaskflowEventHandlers = { + [K in keyof TEvents]?: (payload: TEvents[K]) => void | Promise; +}; + +export interface SubscribeToTaskflowTaskOptions< + TEvents = Record, + TResult = unknown, +> { + /** SSE endpoint — the same route the bridge writes to. */ + url: string; + /** + * Optional request body. When set, the helper issues `POST` with + * `Content-Type: application/json`; otherwise `GET`. Use `GET` for + * reattach routes that resolve a stream by URL parameter. + */ + payload?: unknown; + /** Optional headers; merged with the helper's defaults. */ + headers?: HeadersInit; + /** + * Last seen `id:` to resume from on a reconnect. Sent as + * `Last-Event-ID`. The TaskFlow bridge maps it to the engine's + * `streamSeq`, which the WAL uses to replay any frames the client + * missed. + */ + lastEventId?: string; + /** Abort signal to cancel the underlying fetch. */ + signal?: AbortSignal; + /** + * Called once with the engine-derived idempotency key. Surfaced + * twice by the bridge: as the `X-Taskflow-Idempotency-Key` response + * header (read first if same-origin), and as the `ready` SSE event + * (read by EventSource clients that can't see headers). + */ + onReady?: (info: { idempotencyKey: string }) => void; + /** Per-event handlers, typed by the `TEvents` map. */ + onEvent?: TaskflowEventHandlers; + /** + * Terminal success. Receives the task's return value; the parser + * never throws if the payload doesn't match `TResult`. + */ + onCompleted?: (result: TResult) => void; + /** Terminal failure. Receives the error message produced by the bridge. */ + onFailed?: (message: string) => void; + /** Terminal cancellation (cooperative `stop()` or client disconnect). */ + onCancelled?: () => void; + /** + * Network or parsing error. Not called when the abort signal fires — + * an explicit cancel is not an error. + */ + onError?: (error: unknown) => void; + /** + * Maximum buffered SSE bytes before the helper aborts with an + * error. Defaults to 1MB. + */ + maxBufferSize?: number; +} + +export interface SubscribeToTaskflowTaskResult { + /** + * The idempotency key surfaced by the bridge — either via the + * `X-Taskflow-Idempotency-Key` response header (preferred) or the + * first `ready` event. `null` if the stream ended before the bridge + * sent either, e.g. because the request errored before headers + * flushed. + */ + idempotencyKey: string | null; +} + +/** + * Subscribes to a TaskFlow task SSE stream. Returns when the stream + * ends (terminal event, abort, or network close). + */ +export async function subscribeToTaskflowTask< + TEvents = Record, + TResult = unknown, +>( + options: SubscribeToTaskflowTaskOptions, +): Promise { + const { + url, + payload, + headers: extraHeaders, + lastEventId, + signal, + onReady, + onEvent, + onCompleted, + onFailed, + onCancelled, + onError, + maxBufferSize = 1024 * 1024, + } = options; + + if (!url || url.trim().length === 0) { + throw new Error( + "subscribeToTaskflowTask: 'url' must be a non-empty string", + ); + } + + const hasPayload = typeof payload !== "undefined"; + const baseHeaders: Record = { + Accept: "text/event-stream", + }; + if (hasPayload) baseHeaders["Content-Type"] = "application/json"; + if (lastEventId) baseHeaders["Last-Event-ID"] = lastEventId; + + const headers = mergeHeaders(baseHeaders, extraHeaders); + + let idempotencyKey: string | null = null; + let readyEmitted = false; + + // The consumer's `onReady` runs at most once with the consolidated + // IK. The header is the source of truth (the bridge always sets it + // before flushing the body); the `ready` event is a fallback for + // EventSource clients that can't read response headers and for + // cross-origin requests where the browser hides custom headers. + const emitReady = (ikFromBody?: string) => { + if (readyEmitted) return; + if (!idempotencyKey && ikFromBody) idempotencyKey = ikFromBody; + if (!idempotencyKey) return; + readyEmitted = true; + onReady?.({ idempotencyKey }); + }; + + const result: SubscribeToTaskflowTaskResult = { + get idempotencyKey() { + return idempotencyKey; + }, + }; + + try { + const response = await fetch(url, { + method: hasPayload ? "POST" : "GET", + headers, + body: hasPayload ? JSON.stringify(payload) : undefined, + signal, + }); + + if (!response.ok) { + throw new Error(`HTTP ${response.status}`); + } + if (!response.body) { + throw new Error("No response body"); + } + + const headerIK = response.headers.get(TASKFLOW_IK_HEADER); + if (headerIK) { + idempotencyKey = headerIK; + emitReady(); + } + + const reader = response.body.getReader(); + const decoder = new TextDecoder("utf-8"); + let buffer = ""; + + while (true) { + const { done, value } = await reader.read(); + if (done) break; + + const decoded = decoder.decode(value, { stream: true }); + if (buffer.length + decoded.length > maxBufferSize) { + throw new Error("subscribeToTaskflowTask: buffer size exceeded"); + } + buffer += decoded; + + const normalized = buffer.replace(/\r\n/g, "\n"); + const parts = normalized.split("\n\n"); + buffer = parts.pop() ?? ""; + + for (const part of parts) { + const message = parseSseFrame(part); + if (!message) continue; + + const { stop } = await dispatchMessage(message, { + onReady: (info) => emitReady(info.idempotencyKey), + onEvent, + onCompleted, + onFailed, + onCancelled, + }); + + if (stop) { + // Terminal event seen — stop reading. The server will close + // the underlying stream as well, but bailing here releases + // the reader immediately so the caller can move on. + await reader.cancel().catch(() => {}); + return result; + } + } + } + + return result; + } catch (error) { + if (signal?.aborted) { + // Explicit cancel from the caller — not an error path. + return result; + } + onError?.(error); + return result; + } +} + +// ── internals ────────────────────────────────────────────────────────── + +interface DispatchHandlers { + onReady?: (info: { idempotencyKey: string }) => void; + onEvent?: TaskflowEventHandlers; + onCompleted?: (result: TResult) => void; + onFailed?: (message: string) => void; + onCancelled?: () => void; +} + +async function dispatchMessage( + message: SSEMessage, + handlers: DispatchHandlers, +): Promise<{ stop: boolean }> { + const eventName = message.event || "message"; + const payload = parseJson(message.data); + + if (eventName === "ready") { + if (payload && typeof payload === "object" && "idempotencyKey" in payload) { + handlers.onReady?.({ + idempotencyKey: String( + (payload as { idempotencyKey: unknown }).idempotencyKey, + ), + }); + } + return { stop: false }; + } + + if (eventName === "completed") { + handlers.onCompleted?.(payload as TResult); + return { stop: true }; + } + + if (eventName === "failed" || eventName === "error") { + const msg = + payload && typeof payload === "object" && "message" in payload + ? String((payload as { message: unknown }).message) + : eventName; + handlers.onFailed?.(msg); + return { stop: true }; + } + + if (eventName === "cancelled") { + handlers.onCancelled?.(); + return { stop: true }; + } + + // Custom event from `ctx.emit(name, payload)`. The handler map is + // optional per name — frames the consumer doesn't subscribe to are + // silently dropped (matches the at-most-once-per-handler shape). + const handler = handlers.onEvent?.[eventName as keyof TEvents] as + | ((p: unknown) => void | Promise) + | undefined; + if (handler) await handler(payload); + + return { stop: false }; +} + +/** + * Parses one SSE chunk. Mirrors the parser used by `connectSSE` so the + * two helpers stay aligned on field handling (id, event, data, comment + * lines). + */ +function parseSseFrame(chunk: string): SSEMessage | null { + const lines = chunk.replace(/\r\n/g, "\n").split("\n"); + + let id: string | undefined; + let event: string | undefined; + const dataLines: string[] = []; + + for (const rawLine of lines) { + const line = rawLine.trimEnd(); + if (line === "" || line.startsWith(":")) continue; + + if (line.startsWith("id:")) id = line.slice(3).trimStart(); + else if (line.startsWith("event:")) event = line.slice(6).trimStart(); + else if (line.startsWith("data:")) + dataLines.push(line.slice(5).trimStart()); + } + + if (dataLines.length === 0) return null; + return { + id: id ?? "", + event: event ?? "", + data: dataLines.join("\n"), + }; +} + +function parseJson(raw: string): unknown { + if (!raw) return null; + try { + return JSON.parse(raw); + } catch { + return null; + } +} + +function mergeHeaders( + base: Record, + extra: HeadersInit | undefined, +): HeadersInit { + if (!extra) return base; + const out: Record = { ...base }; + if (extra instanceof Headers) { + extra.forEach((value, key) => { + out[key] = value; + }); + } else if (Array.isArray(extra)) { + for (const [k, v] of extra) out[k] = v; + } else { + Object.assign(out, extra); + } + return out; +} diff --git a/packages/appkit-ui/src/js/sse/types.ts b/packages/appkit-ui/src/js/sse/types.ts index 7b55df8d3..b301c7b8d 100644 --- a/packages/appkit-ui/src/js/sse/types.ts +++ b/packages/appkit-ui/src/js/sse/types.ts @@ -4,6 +4,13 @@ export type SSEMessage = { id: string; /** Raw data payload */ data: string; + /** + * SSE `event:` field name when present (e.g. "tick", "completed"). + * Empty string for unnamed/default events. Frames produced by the + * AppKit TaskFlow bridge always carry an `event` name; consumers of + * generic SSE may leave this unset. + */ + event: string; }; /** Options for opening a resilient SSE connection */