feat(dev-playground): durable-task example + typed SSE client helper#380
Draft
ditadi wants to merge 1 commit into
Draft
feat(dev-playground): durable-task example + typed SSE client helper#380ditadi wants to merge 1 commit into
ditadi wants to merge 1 commit into
Conversation
This was referenced May 12, 2026
fdf81b8 to
fe71e3a
Compare
c88f075 to
bcda23c
Compare
The reference implementation for plugin authors. A demo plugin
covering both TaskFlow recovery patterns (manual via `ctx.previousEvents`
and structural via `step()`), a typed frontend SSE consumer
(`subscribeToTaskflowTask<TEvents>`), and the `connectSSE` parser
extension that captures `event:` field names. Bottom-up teaching:
server plugin → bridge → client helper → React UI.
Server demo plugin (`apps/dev-playground/server/durable-task-example-plugin.ts`):
- `count-to-n` task — manual recovery via `ctx.previousEvents`.
Ticks once per `sleepMs`, emitting typed `tick` events. On
recovery, scans the event log to find the last persisted tick and
resumes from there. The pattern for "checkpoint is the last time
I emitted X" with no expensive computation to memoize.
- `pipeline-with-steps` task — automatic recovery via `step()`.
Wraps each stage (extract → transform → load) with `step()`,
which memoizes its result in the WAL the first time it runs. On
recovery, completed stages return the cached value without
re-executing. The pattern for stages that are expensive (LLM
calls, large queries) and unsafe to replay.
- Routes (mounted under `/api/durable-example`):
- `POST /run`, `POST /run-pipeline` — start + bridge SSE via
`executeTask`.
- `POST /crash/:id` — `simulateCrash` (gated behind
`NODE_ENV !== "production"`).
- `POST /stop/:id` — cooperative `taskflow.stop({ reason })`.
- `POST /nudge-recovery` — re-submits the original input so the
same IK triggers stale-Running recovery (`engine.resume()` only
applies to Suspended tasks, so the demo "nudges" the engine).
- `GET /reattach/:id` — bridges an SSE stream onto an existing
task by IK via `subscribe()` + `setupSseHeaders` + `writeSseFrame`
directly (the `executeTask` path would derive a new IK).
Performs an OBO ownership check via `asUser(req).reconnect(id,
userId)` before subscribing (F11 fix).
- Registers with `enableTestMode: true` so `simulateCrash` is
available; the route handler additionally gates on `NODE_ENV` so
a misconfigured production deployment can't crash live tasks.
`apps/dev-playground/server/index.ts`:
- Registers the demo plugin and enables test mode on the TaskFlow
config (`taskflow: { engine: { enableTestMode: true } }`) so
`simulateCrash` is callable from the demo route.
Client React route (`apps/dev-playground/client/src/routes/durable-task.route.tsx`):
- Exercises both tasks end-to-end: `POST /run` then opens an SSE
stream via `subscribeToTaskflowTask<CountEvents>`. Renders `tick`
/ `recovered` events for `count-to-n`; renders `stage_started` /
`stage_done` / `recovered` for `pipeline-with-steps` (which
surfaces "from cache" on recovered stages).
- Buttons for Stop, Crash, Nudge, and Reattach exercise the full
cancellation / crash / recovery / re-attach loop.
- Adds nav entries in `__root.tsx`, `index.tsx`, and the
TanStack-generated `routeTree.gen.ts`.
Typed client helper (`packages/appkit-ui/src/js/sse/subscribe-taskflow-task.ts`):
- `subscribeToTaskflowTask<TEvents>(url, { onEvent, onComplete,
onError, signal? })` — typed async API consuming the AppKit SSE
bridge. Each `event: <name>` frame is dispatched to `onEvent[name]`
with `payload` typed as `TEvents[name]`.
- Terminal events (`completed`, `failed`, `cancelled`) resolve /
reject the returned promise so plugins can `await` the durable
run without an event handler.
- `Last-Event-ID` reconnection: the helper tracks the highest seen
`id:` frame and reattaches with that header on transient network
failure. Tests assert the reconnect math is correct.
- Includes tests for happy-path streaming, terminal events, abort
via `AbortSignal`, and Last-Event-ID reconnect.
`connect-sse` extension (`packages/appkit-ui/src/js/sse/connect-sse.ts`,
`types.ts`, `index.ts`):
- The generic SSE parser captures `event:` field names alongside
`data:` payloads. `SSEMessage` gains `event?: string` so any
AppKit SSE consumer can inspect the event name without re-parsing.
Tests cover multi-line `data:` joining, CRLF normalisation, and
comment-frame handling.
- Export the new typed helper from `index.ts`.
Gitignore:
- `apps/dev-playground/.gitignore` adds `tasks.*` / `*.wal` patterns
as a defensive belt-and-braces around the existing `.appkit/`
exclusion. The demo plugin may configure storage at the playground
root for diagnostics; the additional patterns keep `tasks.db` and
the rotating WAL out of git regardless of `databasePath`.
Verify:
- `pnpm -r typecheck`, `pnpm build`, `pnpm test` (125 files, 2304
tests) all green.
- `pnpm exec biome check` clean on touched files.
- `pnpm exec knip` clean.
Risk. Demo plugin is unauthenticated by design (it ships with the
dev playground, not the SDK). `/crash/:id` returns 404 in production
via `NODE_ENV` gate; `enableTestMode` flips on `simulateCrash`. The
demo route handlers do not enforce auth — they assume the playground
sits behind the Databricks Apps proxy. Document in deployment notes.
Not in this PR. No production-plugin changes. No doc rewrite — that's
PR 7. The `subscribeToTaskflowTask` helper currently requires plugin
authors to redeclare `TEvents` client-side; a future follow-up (F26)
would derive it from the registered `TaskHandle`.
Stacked on: stack/taskflow/analytics-migration.
Signed-off-by: ditadi <victordperd@gmail.com>
fe71e3a to
c02bcd9
Compare
bcda23c to
9a2926d
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
🥞 Stacked PR
Use this link to review incremental changes.
The reference implementation for plugin authors. A demo plugin
covering both TaskFlow recovery patterns (manual via
ctx.previousEventsand structural via
step()), a typed frontend SSE consumer(
subscribeToTaskflowTask<TEvents>), and theconnectSSEparserextension that captures
event:field names. Bottom-up teaching:server plugin → bridge → client helper → React UI.
Server demo plugin (
apps/dev-playground/server/durable-task-example-plugin.ts):count-to-ntask — manual recovery viactx.previousEvents.Ticks once per
sleepMs, emitting typedtickevents. Onrecovery, scans the event log to find the last persisted tick and
resumes from there. The pattern for "checkpoint is the last time
I emitted X" with no expensive computation to memoize.
pipeline-with-stepstask — automatic recovery viastep().Wraps each stage (extract → transform → load) with
step(),which memoizes its result in the WAL the first time it runs. On
recovery, completed stages return the cached value without
re-executing. The pattern for stages that are expensive (LLM
calls, large queries) and unsafe to replay.
/api/durable-example):POST /run,POST /run-pipeline— start + bridge SSE viaexecuteTask.POST /crash/:id—simulateCrash(gated behindNODE_ENV !== "production").POST /stop/:id— cooperativetaskflow.stop({ reason }).POST /nudge-recovery— re-submits the original input so thesame IK triggers stale-Running recovery (
engine.resume()onlyapplies to Suspended tasks, so the demo "nudges" the engine).
GET /reattach/:id— bridges an SSE stream onto an existingtask by IK via
subscribe()+setupSseHeaders+writeSseFramedirectly (the
executeTaskpath would derive a new IK).Performs an OBO ownership check via
asUser(req).reconnect(id, userId)before subscribing (F11 fix).enableTestMode: truesosimulateCrashisavailable; the route handler additionally gates on
NODE_ENVsoa misconfigured production deployment can't crash live tasks.
apps/dev-playground/server/index.ts:config (
taskflow: { engine: { enableTestMode: true } }) sosimulateCrashis callable from the demo route.Client React route (
apps/dev-playground/client/src/routes/durable-task.route.tsx):POST /runthen opens an SSEstream via
subscribeToTaskflowTask<CountEvents>. Renderstick/
recoveredevents forcount-to-n; rendersstage_started/stage_done/recoveredforpipeline-with-steps(whichsurfaces "from cache" on recovered stages).
cancellation / crash / recovery / re-attach loop.
__root.tsx,index.tsx, and theTanStack-generated
routeTree.gen.ts.Typed client helper (
packages/appkit-ui/src/js/sse/subscribe-taskflow-task.ts):subscribeToTaskflowTask<TEvents>(url, { onEvent, onComplete, onError, signal? })— typed async API consuming the AppKit SSEbridge. Each
event: <name>frame is dispatched toonEvent[name]with
payloadtyped asTEvents[name].completed,failed,cancelled) resolve /reject the returned promise so plugins can
awaitthe durablerun without an event handler.
Last-Event-IDreconnection: the helper tracks the highest seenid:frame and reattaches with that header on transient networkfailure. Tests assert the reconnect math is correct.
via
AbortSignal, and Last-Event-ID reconnect.connect-sseextension (packages/appkit-ui/src/js/sse/connect-sse.ts,types.ts,index.ts):event:field names alongsidedata:payloads.SSEMessagegainsevent?: stringso anyAppKit SSE consumer can inspect the event name without re-parsing.
Tests cover multi-line
data:joining, CRLF normalisation, andcomment-frame handling.
index.ts.Gitignore:
apps/dev-playground/.gitignoreaddstasks.*/*.walpatternsas a defensive belt-and-braces around the existing
.appkit/exclusion. The demo plugin may configure storage at the playground
root for diagnostics; the additional patterns keep
tasks.dbandthe rotating WAL out of git regardless of
databasePath.Verify:
pnpm -r typecheck,pnpm build,pnpm test(125 files, 2304tests) all green.
pnpm exec biome checkclean on touched files.pnpm exec knipclean.Risk. Demo plugin is unauthenticated by design (it ships with the
dev playground, not the SDK).
/crash/:idreturns 404 in productionvia
NODE_ENVgate;enableTestModeflips onsimulateCrash. Thedemo route handlers do not enforce auth — they assume the playground
sits behind the Databricks Apps proxy. Document in deployment notes.
Not in this PR. No production-plugin changes. No doc rewrite — that's
PR 7. The
subscribeToTaskflowTaskhelper currently requires pluginauthors to redeclare
TEventsclient-side; a future follow-up (F26)would derive it from the registered
TaskHandle.Stacked on: stack/taskflow/analytics-migration.
Signed-off-by: ditadi victordperd@gmail.com