Skip to content

feat(dev-playground): durable-task example + typed SSE client helper#380

Draft
ditadi wants to merge 1 commit into
stack/taskflow/analytics-migrationfrom
stack/taskflow/durable-task-demo
Draft

feat(dev-playground): durable-task example + typed SSE client helper#380
ditadi wants to merge 1 commit into
stack/taskflow/analytics-migrationfrom
stack/taskflow/durable-task-demo

Conversation

@ditadi
Copy link
Copy Markdown
Contributor

@ditadi ditadi commented May 12, 2026

🥞 Stacked PR

Use this link to review incremental changes.


The reference implementation for plugin authors. A demo plugin
covering both TaskFlow recovery patterns (manual via ctx.previousEvents
and structural via step()), a typed frontend SSE consumer
(subscribeToTaskflowTask<TEvents>), and the connectSSE parser
extension that captures event: field names. Bottom-up teaching:
server plugin → bridge → client helper → React UI.

Server demo plugin (apps/dev-playground/server/durable-task-example-plugin.ts):

  • count-to-n task — manual recovery via ctx.previousEvents.
    Ticks once per sleepMs, emitting typed tick events. On
    recovery, scans the event log to find the last persisted tick and
    resumes from there. The pattern for "checkpoint is the last time
    I emitted X" with no expensive computation to memoize.
  • pipeline-with-steps task — automatic recovery via step().
    Wraps each stage (extract → transform → load) with step(),
    which memoizes its result in the WAL the first time it runs. On
    recovery, completed stages return the cached value without
    re-executing. The pattern for stages that are expensive (LLM
    calls, large queries) and unsafe to replay.
  • Routes (mounted under /api/durable-example):
    • POST /run, POST /run-pipeline — start + bridge SSE via
      executeTask.
    • POST /crash/:idsimulateCrash (gated behind
      NODE_ENV !== "production").
    • POST /stop/:id — cooperative taskflow.stop({ reason }).
    • POST /nudge-recovery — re-submits the original input so the
      same IK triggers stale-Running recovery (engine.resume() only
      applies to Suspended tasks, so the demo "nudges" the engine).
    • GET /reattach/:id — bridges an SSE stream onto an existing
      task by IK via subscribe() + setupSseHeaders + writeSseFrame
      directly (the executeTask path would derive a new IK).
      Performs an OBO ownership check via asUser(req).reconnect(id, userId) before subscribing (F11 fix).
  • Registers with enableTestMode: true so simulateCrash is
    available; the route handler additionally gates on NODE_ENV so
    a misconfigured production deployment can't crash live tasks.

apps/dev-playground/server/index.ts:

  • Registers the demo plugin and enables test mode on the TaskFlow
    config (taskflow: { engine: { enableTestMode: true } }) so
    simulateCrash is callable from the demo route.

Client React route (apps/dev-playground/client/src/routes/durable-task.route.tsx):

  • Exercises both tasks end-to-end: POST /run then opens an SSE
    stream via subscribeToTaskflowTask<CountEvents>. Renders tick
    / recovered events for count-to-n; renders stage_started /
    stage_done / recovered for pipeline-with-steps (which
    surfaces "from cache" on recovered stages).
  • Buttons for Stop, Crash, Nudge, and Reattach exercise the full
    cancellation / crash / recovery / re-attach loop.
  • Adds nav entries in __root.tsx, index.tsx, and the
    TanStack-generated routeTree.gen.ts.

Typed client helper (packages/appkit-ui/src/js/sse/subscribe-taskflow-task.ts):

  • subscribeToTaskflowTask<TEvents>(url, { onEvent, onComplete, onError, signal? }) — typed async API consuming the AppKit SSE
    bridge. Each event: <name> frame is dispatched to onEvent[name]
    with payload typed as TEvents[name].
  • Terminal events (completed, failed, cancelled) resolve /
    reject the returned promise so plugins can await the durable
    run without an event handler.
  • Last-Event-ID reconnection: the helper tracks the highest seen
    id: frame and reattaches with that header on transient network
    failure. Tests assert the reconnect math is correct.
  • Includes tests for happy-path streaming, terminal events, abort
    via AbortSignal, and Last-Event-ID reconnect.

connect-sse extension (packages/appkit-ui/src/js/sse/connect-sse.ts,
types.ts, index.ts):

  • The generic SSE parser captures event: field names alongside
    data: payloads. SSEMessage gains event?: string so any
    AppKit SSE consumer can inspect the event name without re-parsing.
    Tests cover multi-line data: joining, CRLF normalisation, and
    comment-frame handling.
  • Export the new typed helper from index.ts.

Gitignore:

  • apps/dev-playground/.gitignore adds tasks.* / *.wal patterns
    as a defensive belt-and-braces around the existing .appkit/
    exclusion. The demo plugin may configure storage at the playground
    root for diagnostics; the additional patterns keep tasks.db and
    the rotating WAL out of git regardless of databasePath.

Verify:

  • pnpm -r typecheck, pnpm build, pnpm test (125 files, 2304
    tests) all green.
  • pnpm exec biome check clean on touched files.
  • pnpm exec knip clean.

Risk. Demo plugin is unauthenticated by design (it ships with the
dev playground, not the SDK). /crash/:id returns 404 in production
via NODE_ENV gate; enableTestMode flips on simulateCrash. The
demo route handlers do not enforce auth — they assume the playground
sits behind the Databricks Apps proxy. Document in deployment notes.

Not in this PR. No production-plugin changes. No doc rewrite — that's
PR 7. The subscribeToTaskflowTask helper currently requires plugin
authors to redeclare TEvents client-side; a future follow-up (F26)
would derive it from the registered TaskHandle.

Stacked on: stack/taskflow/analytics-migration.

Signed-off-by: ditadi victordperd@gmail.com

The reference implementation for plugin authors. A demo plugin
covering both TaskFlow recovery patterns (manual via `ctx.previousEvents`
and structural via `step()`), a typed frontend SSE consumer
(`subscribeToTaskflowTask<TEvents>`), and the `connectSSE` parser
extension that captures `event:` field names. Bottom-up teaching:
server plugin → bridge → client helper → React UI.

Server demo plugin (`apps/dev-playground/server/durable-task-example-plugin.ts`):

- `count-to-n` task — manual recovery via `ctx.previousEvents`.
  Ticks once per `sleepMs`, emitting typed `tick` events. On
  recovery, scans the event log to find the last persisted tick and
  resumes from there. The pattern for "checkpoint is the last time
  I emitted X" with no expensive computation to memoize.
- `pipeline-with-steps` task — automatic recovery via `step()`.
  Wraps each stage (extract → transform → load) with `step()`,
  which memoizes its result in the WAL the first time it runs. On
  recovery, completed stages return the cached value without
  re-executing. The pattern for stages that are expensive (LLM
  calls, large queries) and unsafe to replay.
- Routes (mounted under `/api/durable-example`):
  - `POST /run`, `POST /run-pipeline` — start + bridge SSE via
    `executeTask`.
  - `POST /crash/:id` — `simulateCrash` (gated behind
    `NODE_ENV !== "production"`).
  - `POST /stop/:id` — cooperative `taskflow.stop({ reason })`.
  - `POST /nudge-recovery` — re-submits the original input so the
    same IK triggers stale-Running recovery (`engine.resume()` only
    applies to Suspended tasks, so the demo "nudges" the engine).
  - `GET /reattach/:id` — bridges an SSE stream onto an existing
    task by IK via `subscribe()` + `setupSseHeaders` + `writeSseFrame`
    directly (the `executeTask` path would derive a new IK).
    Performs an OBO ownership check via `asUser(req).reconnect(id,
    userId)` before subscribing (F11 fix).
- Registers with `enableTestMode: true` so `simulateCrash` is
  available; the route handler additionally gates on `NODE_ENV` so
  a misconfigured production deployment can't crash live tasks.

`apps/dev-playground/server/index.ts`:

- Registers the demo plugin and enables test mode on the TaskFlow
  config (`taskflow: { engine: { enableTestMode: true } }`) so
  `simulateCrash` is callable from the demo route.

Client React route (`apps/dev-playground/client/src/routes/durable-task.route.tsx`):

- Exercises both tasks end-to-end: `POST /run` then opens an SSE
  stream via `subscribeToTaskflowTask<CountEvents>`. Renders `tick`
  / `recovered` events for `count-to-n`; renders `stage_started` /
  `stage_done` / `recovered` for `pipeline-with-steps` (which
  surfaces "from cache" on recovered stages).
- Buttons for Stop, Crash, Nudge, and Reattach exercise the full
  cancellation / crash / recovery / re-attach loop.
- Adds nav entries in `__root.tsx`, `index.tsx`, and the
  TanStack-generated `routeTree.gen.ts`.

Typed client helper (`packages/appkit-ui/src/js/sse/subscribe-taskflow-task.ts`):

- `subscribeToTaskflowTask<TEvents>(url, { onEvent, onComplete,
  onError, signal? })` — typed async API consuming the AppKit SSE
  bridge. Each `event: <name>` frame is dispatched to `onEvent[name]`
  with `payload` typed as `TEvents[name]`.
- Terminal events (`completed`, `failed`, `cancelled`) resolve /
  reject the returned promise so plugins can `await` the durable
  run without an event handler.
- `Last-Event-ID` reconnection: the helper tracks the highest seen
  `id:` frame and reattaches with that header on transient network
  failure. Tests assert the reconnect math is correct.
- Includes tests for happy-path streaming, terminal events, abort
  via `AbortSignal`, and Last-Event-ID reconnect.

`connect-sse` extension (`packages/appkit-ui/src/js/sse/connect-sse.ts`,
`types.ts`, `index.ts`):

- The generic SSE parser captures `event:` field names alongside
  `data:` payloads. `SSEMessage` gains `event?: string` so any
  AppKit SSE consumer can inspect the event name without re-parsing.
  Tests cover multi-line `data:` joining, CRLF normalisation, and
  comment-frame handling.
- Export the new typed helper from `index.ts`.

Gitignore:

- `apps/dev-playground/.gitignore` adds `tasks.*` / `*.wal` patterns
  as a defensive belt-and-braces around the existing `.appkit/`
  exclusion. The demo plugin may configure storage at the playground
  root for diagnostics; the additional patterns keep `tasks.db` and
  the rotating WAL out of git regardless of `databasePath`.

Verify:

- `pnpm -r typecheck`, `pnpm build`, `pnpm test` (125 files, 2304
  tests) all green.
- `pnpm exec biome check` clean on touched files.
- `pnpm exec knip` clean.

Risk. Demo plugin is unauthenticated by design (it ships with the
dev playground, not the SDK). `/crash/:id` returns 404 in production
via `NODE_ENV` gate; `enableTestMode` flips on `simulateCrash`. The
demo route handlers do not enforce auth — they assume the playground
sits behind the Databricks Apps proxy. Document in deployment notes.

Not in this PR. No production-plugin changes. No doc rewrite — that's
PR 7. The `subscribeToTaskflowTask` helper currently requires plugin
authors to redeclare `TEvents` client-side; a future follow-up (F26)
would derive it from the registered `TaskHandle`.

Stacked on: stack/taskflow/analytics-migration.

Signed-off-by: ditadi <victordperd@gmail.com>
@ditadi ditadi force-pushed the stack/taskflow/durable-task-demo branch from fe71e3a to c02bcd9 Compare May 12, 2026 17:25
@ditadi ditadi force-pushed the stack/taskflow/analytics-migration branch from bcda23c to 9a2926d Compare May 12, 2026 17:25
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant