Skip to content

feat(appkit): executeTask, TypedTaskContext, SSE bridge, step#378

Draft
ditadi wants to merge 1 commit into
stack/taskflow/taskflow-servicefrom
stack/taskflow/execute-task
Draft

feat(appkit): executeTask, TypedTaskContext, SSE bridge, step#378
ditadi wants to merge 1 commit into
stack/taskflow/taskflow-servicefrom
stack/taskflow/execute-task

Conversation

@ditadi
Copy link
Copy Markdown
Contributor

@ditadi ditadi commented May 12, 2026

🥞 Stacked PR

Use this link to review incremental changes.


The public AppKit-side surface for TaskFlow durable execution. Plugin
authors can now register typed durable tasks
(TaskDefinition<TInput, TOutput, TEvents>), bridge them to SSE via
this.executeTask(res, name, input, settings?), use step() for
replay-safe checkpoints, and route OBO identity automatically through
runInUserContext + getCurrentUserContext.

Public surface added under packages/appkit/src/taskflow/:

  • TypedTaskContext<TEvents> — narrows ctx.emit(name, payload) to a
    declared event-name → payload-shape map so the SSE wire shape is
    typed end-to-end.
  • TaskDefinition<TInput, TResult, TEvents> and
    TaskHandleRef/TaskRef — the registration object accepted by
    taskflow.task(...) and the branded handle it returns. Phantom
    generic parameters propagate to executeTask so input / event maps
    flow without redundant generics.
  • ExecuteTaskSettings — strictly disjoint from PluginExecutionSettings.
    retry, cache, timeout, stream, and userId are typed as
    never so any caller copying a settings object from execute() /
    executeStream() gets a clear compile-time error. TaskFlow handles
    those concerns natively (smart recovery, IK dedup, cooperative
    stop()).
  • step(fn) / step("name", fn) — workflow primitive with deferred
    binding (resolves the engine wrapper on first invocation so plugin
    authors can declare const x = step(...) at module scope) and an
    explicit guard against anonymous-arrow WAL key collisions.
  • setupSseHeaders, writeSseFrame, writeSseComment,
    RESERVED_BRIDGE_EVENT_NAMES, TASKFLOW_IK_HEADER — SSE wire
    helpers with a CRLF guard on event names, multi-line `data:`
    framing, and `Vary: Origin` for cache safety.
  • userContextFromTaskCtx(ctx) — reads the OBO UserContext from
    the FFI sidecar in ctx.context, with a discriminator check
    (isUserContext) so a stale or wrong-shape payload returns null
    instead of being trusted.

executeTask (lives in taskflow/execute-task.ts) implements the
durable bridge:

  • Derives identity from getCurrentUserContext() only (never from
    request headers or settings) — this.asUser(req).executeTask(...)
    yields OBO, the bare call yields SP.
  • Submits via taskflow.start(...) with the engine sidecar carrying
    the OBO UserContext, then bridges the WAL stream to SSE with
    Last-Event-ID replay support clamped to
    [0, MAX_SAFE_INTEGER].
  • Drops custom:step:* events (WAL-only checkpoints), translates
    heartbeat to SSE comments, and routes terminal events
    (completed / failed / cancelled) before closing.
  • Reserved-name guard: plugin emissions whose name collides with the
    bridge wire vocabulary (ready, completed, failed, error,
    …) are dropped with logger.warn, never closing the stream.
  • BigInt-safe JSON replacer (warehouse payloads round-trip cleanly).
  • cancelOnDisconnect (default true) + disconnectGraceMs
    (default 5 s) — short reconnects don't kill the durable run.
  • Production-safe error path: post-headers errors mask the message
    when NODE_ENV=production.

TaskflowService extensions in taskflow/index.ts:

  • task<TInput, TResult, TEvents>(def) — typed registration with
    a hard-error on duplicate registration in production (silent
    shadowing was the prior failure mode); HMR warning in dev.
  • hasTask(name) / getRegistration(name) — used by executeTask
    to surface OBO-misconfiguration diagnostics.
  • _registerBridge(bridge) — bookkeeping for active SSE bridges so
    shutdown() can drain them with an explicit
    event: error / server_shutting_down frame before the engine
    closes iterators (otherwise the client sees mid-stream EOF).

Context layer:

  • getCurrentUserContext() — public accessor for the active OBO
    UserContext (sugar for the same AsyncLocalStorage slot
    isInUserContext() already reads).

Plugin layer:

  • Plugin.executeTask(res, task, input, settings?) — typed entry
    point. task accepts either a registered name (string) or a
    branded TaskRef. The method is deliberately NOT in
    EXCLUDED_FROM_PROXY so OBO routing via the asUser proxy works
    uniformly with execute / executeStream.

Tests scaffolding:

  • tools/test-helpers.tscreateStubTaskflowService (in-process
    fake of the TaskFlow surface for unit tests; consumed by PR 5's
    analytics tests) and a robust parseSSEResponse upgrade (multi-line
    data: joining per the SSE spec, CRLF normalisation, comment
    skipping, eventType filtering).

Verify:

  • pnpm -r typecheck, pnpm build, pnpm test (122 files, 2279
    tests) all green.
  • pnpm exec knip clean (no unused exports or types).
  • pnpm exec biome check clean on touched files.

Not in this PR. No plugin uses executeTask yet — analytics migration
is PR 5. No demo plugin — that's PR 6. No docs rewrite — that's PR 7.

Stacked on: stack/taskflow/taskflow-service (#376).

Signed-off-by: Victor Ditadi victor.ditadi@databricks.com
Signed-off-by: ditadi victordperd@gmail.com

The public AppKit-side surface for TaskFlow durable execution. Plugin
authors can now register typed durable tasks
(`TaskDefinition<TInput, TOutput, TEvents>`), bridge them to SSE via
`this.executeTask(res, name, input, settings?)`, use `step()` for
replay-safe checkpoints, and route OBO identity automatically through
`runInUserContext` + `getCurrentUserContext`.

Public surface added under `packages/appkit/src/taskflow/`:

- `TypedTaskContext<TEvents>` — narrows `ctx.emit(name, payload)` to a
  declared event-name → payload-shape map so the SSE wire shape is
  typed end-to-end.
- `TaskDefinition<TInput, TResult, TEvents>` and
  `TaskHandleRef`/`TaskRef` — the registration object accepted by
  `taskflow.task(...)` and the branded handle it returns. Phantom
  generic parameters propagate to `executeTask` so input / event maps
  flow without redundant generics.
- `ExecuteTaskSettings` — strictly disjoint from `PluginExecutionSettings`.
  `retry`, `cache`, `timeout`, `stream`, and `userId` are typed as
  `never` so any caller copying a settings object from `execute()` /
  `executeStream()` gets a clear compile-time error. TaskFlow handles
  those concerns natively (smart recovery, IK dedup, cooperative
  `stop()`).
- `step(fn)` / `step("name", fn)` — workflow primitive with deferred
  binding (resolves the engine wrapper on first invocation so plugin
  authors can declare `const x = step(...)` at module scope) and an
  explicit guard against anonymous-arrow WAL key collisions.
- `setupSseHeaders`, `writeSseFrame`, `writeSseComment`,
  `RESERVED_BRIDGE_EVENT_NAMES`, `TASKFLOW_IK_HEADER` — SSE wire
  helpers with a CRLF guard on event names, multi-line \`data:\`
  framing, and \`Vary: Origin\` for cache safety.
- `userContextFromTaskCtx(ctx)` — reads the OBO `UserContext` from
  the FFI sidecar in `ctx.context`, with a discriminator check
  (`isUserContext`) so a stale or wrong-shape payload returns null
  instead of being trusted.

`executeTask` (lives in `taskflow/execute-task.ts`) implements the
durable bridge:

- Derives identity from `getCurrentUserContext()` only (never from
  request headers or settings) — `this.asUser(req).executeTask(...)`
  yields OBO, the bare call yields SP.
- Submits via `taskflow.start(...)` with the engine sidecar carrying
  the OBO `UserContext`, then bridges the WAL stream to SSE with
  `Last-Event-ID` replay support clamped to
  `[0, MAX_SAFE_INTEGER]`.
- Drops `custom:step:*` events (WAL-only checkpoints), translates
  `heartbeat` to SSE comments, and routes terminal events
  (`completed` / `failed` / `cancelled`) before closing.
- Reserved-name guard: plugin emissions whose name collides with the
  bridge wire vocabulary (`ready`, `completed`, `failed`, `error`,
  …) are dropped with `logger.warn`, never closing the stream.
- BigInt-safe JSON replacer (warehouse payloads round-trip cleanly).
- `cancelOnDisconnect` (default `true`) + `disconnectGraceMs`
  (default 5 s) — short reconnects don't kill the durable run.
- Production-safe error path: post-headers errors mask the message
  when `NODE_ENV=production`.

`TaskflowService` extensions in `taskflow/index.ts`:

- `task<TInput, TResult, TEvents>(def)` — typed registration with
  a hard-error on duplicate registration in production (silent
  shadowing was the prior failure mode); HMR warning in dev.
- `hasTask(name)` / `getRegistration(name)` — used by `executeTask`
  to surface OBO-misconfiguration diagnostics.
- `_registerBridge(bridge)` — bookkeeping for active SSE bridges so
  `shutdown()` can drain them with an explicit
  `event: error` / `server_shutting_down` frame *before* the engine
  closes iterators (otherwise the client sees mid-stream EOF).

Context layer:

- `getCurrentUserContext()` — public accessor for the active OBO
  `UserContext` (sugar for the same AsyncLocalStorage slot
  `isInUserContext()` already reads).

Plugin layer:

- `Plugin.executeTask(res, task, input, settings?)` — typed entry
  point. `task` accepts either a registered name (`string`) or a
  branded `TaskRef`. The method is deliberately NOT in
  `EXCLUDED_FROM_PROXY` so OBO routing via the `asUser` proxy works
  uniformly with `execute` / `executeStream`.

Tests scaffolding:

- `tools/test-helpers.ts` — `createStubTaskflowService` (in-process
  fake of the TaskFlow surface for unit tests; consumed by PR 5's
  analytics tests) and a robust `parseSSEResponse` upgrade (multi-line
  `data:` joining per the SSE spec, CRLF normalisation, comment
  skipping, `eventType` filtering).

Verify:

- `pnpm -r typecheck`, `pnpm build`, `pnpm test` (122 files, 2279
  tests) all green.
- `pnpm exec knip` clean (no unused exports or types).
- `pnpm exec biome check` clean on touched files.

Not in this PR. No plugin uses `executeTask` yet — analytics migration
is PR 5. No demo plugin — that's PR 6. No docs rewrite — that's PR 7.

Stacked on: stack/taskflow/taskflow-service (#376).

Signed-off-by: Victor Ditadi <victor.ditadi@databricks.com>
Signed-off-by: ditadi <victordperd@gmail.com>
@ditadi ditadi force-pushed the stack/taskflow/taskflow-service branch from 9598bd2 to eebf00c Compare May 12, 2026 17:25
@ditadi ditadi force-pushed the stack/taskflow/execute-task branch from 206704d to 13a91d1 Compare May 12, 2026 17:25
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant