feat(appkit): executeTask, TypedTaskContext, SSE bridge, step#378
Draft
ditadi wants to merge 1 commit into
Draft
feat(appkit): executeTask, TypedTaskContext, SSE bridge, step#378ditadi wants to merge 1 commit into
ditadi wants to merge 1 commit into
Conversation
This was referenced May 12, 2026
14d0870 to
206704d
Compare
The public AppKit-side surface for TaskFlow durable execution. Plugin
authors can now register typed durable tasks
(`TaskDefinition<TInput, TOutput, TEvents>`), bridge them to SSE via
`this.executeTask(res, name, input, settings?)`, use `step()` for
replay-safe checkpoints, and route OBO identity automatically through
`runInUserContext` + `getCurrentUserContext`.
Public surface added under `packages/appkit/src/taskflow/`:
- `TypedTaskContext<TEvents>` — narrows `ctx.emit(name, payload)` to a
declared event-name → payload-shape map so the SSE wire shape is
typed end-to-end.
- `TaskDefinition<TInput, TResult, TEvents>` and
`TaskHandleRef`/`TaskRef` — the registration object accepted by
`taskflow.task(...)` and the branded handle it returns. Phantom
generic parameters propagate to `executeTask` so input / event maps
flow without redundant generics.
- `ExecuteTaskSettings` — strictly disjoint from `PluginExecutionSettings`.
`retry`, `cache`, `timeout`, `stream`, and `userId` are typed as
`never` so any caller copying a settings object from `execute()` /
`executeStream()` gets a clear compile-time error. TaskFlow handles
those concerns natively (smart recovery, IK dedup, cooperative
`stop()`).
- `step(fn)` / `step("name", fn)` — workflow primitive with deferred
binding (resolves the engine wrapper on first invocation so plugin
authors can declare `const x = step(...)` at module scope) and an
explicit guard against anonymous-arrow WAL key collisions.
- `setupSseHeaders`, `writeSseFrame`, `writeSseComment`,
`RESERVED_BRIDGE_EVENT_NAMES`, `TASKFLOW_IK_HEADER` — SSE wire
helpers with a CRLF guard on event names, multi-line \`data:\`
framing, and \`Vary: Origin\` for cache safety.
- `userContextFromTaskCtx(ctx)` — reads the OBO `UserContext` from
the FFI sidecar in `ctx.context`, with a discriminator check
(`isUserContext`) so a stale or wrong-shape payload returns null
instead of being trusted.
`executeTask` (lives in `taskflow/execute-task.ts`) implements the
durable bridge:
- Derives identity from `getCurrentUserContext()` only (never from
request headers or settings) — `this.asUser(req).executeTask(...)`
yields OBO, the bare call yields SP.
- Submits via `taskflow.start(...)` with the engine sidecar carrying
the OBO `UserContext`, then bridges the WAL stream to SSE with
`Last-Event-ID` replay support clamped to
`[0, MAX_SAFE_INTEGER]`.
- Drops `custom:step:*` events (WAL-only checkpoints), translates
`heartbeat` to SSE comments, and routes terminal events
(`completed` / `failed` / `cancelled`) before closing.
- Reserved-name guard: plugin emissions whose name collides with the
bridge wire vocabulary (`ready`, `completed`, `failed`, `error`,
…) are dropped with `logger.warn`, never closing the stream.
- BigInt-safe JSON replacer (warehouse payloads round-trip cleanly).
- `cancelOnDisconnect` (default `true`) + `disconnectGraceMs`
(default 5 s) — short reconnects don't kill the durable run.
- Production-safe error path: post-headers errors mask the message
when `NODE_ENV=production`.
`TaskflowService` extensions in `taskflow/index.ts`:
- `task<TInput, TResult, TEvents>(def)` — typed registration with
a hard-error on duplicate registration in production (silent
shadowing was the prior failure mode); HMR warning in dev.
- `hasTask(name)` / `getRegistration(name)` — used by `executeTask`
to surface OBO-misconfiguration diagnostics.
- `_registerBridge(bridge)` — bookkeeping for active SSE bridges so
`shutdown()` can drain them with an explicit
`event: error` / `server_shutting_down` frame *before* the engine
closes iterators (otherwise the client sees mid-stream EOF).
Context layer:
- `getCurrentUserContext()` — public accessor for the active OBO
`UserContext` (sugar for the same AsyncLocalStorage slot
`isInUserContext()` already reads).
Plugin layer:
- `Plugin.executeTask(res, task, input, settings?)` — typed entry
point. `task` accepts either a registered name (`string`) or a
branded `TaskRef`. The method is deliberately NOT in
`EXCLUDED_FROM_PROXY` so OBO routing via the `asUser` proxy works
uniformly with `execute` / `executeStream`.
Tests scaffolding:
- `tools/test-helpers.ts` — `createStubTaskflowService` (in-process
fake of the TaskFlow surface for unit tests; consumed by PR 5's
analytics tests) and a robust `parseSSEResponse` upgrade (multi-line
`data:` joining per the SSE spec, CRLF normalisation, comment
skipping, `eventType` filtering).
Verify:
- `pnpm -r typecheck`, `pnpm build`, `pnpm test` (122 files, 2279
tests) all green.
- `pnpm exec knip` clean (no unused exports or types).
- `pnpm exec biome check` clean on touched files.
Not in this PR. No plugin uses `executeTask` yet — analytics migration
is PR 5. No demo plugin — that's PR 6. No docs rewrite — that's PR 7.
Stacked on: stack/taskflow/taskflow-service (#376).
Signed-off-by: Victor Ditadi <victor.ditadi@databricks.com>
Signed-off-by: ditadi <victordperd@gmail.com>
9598bd2 to
eebf00c
Compare
206704d to
13a91d1
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
🥞 Stacked PR
Use this link to review incremental changes.
The public AppKit-side surface for TaskFlow durable execution. Plugin
authors can now register typed durable tasks
(
TaskDefinition<TInput, TOutput, TEvents>), bridge them to SSE viathis.executeTask(res, name, input, settings?), usestep()forreplay-safe checkpoints, and route OBO identity automatically through
runInUserContext+getCurrentUserContext.Public surface added under
packages/appkit/src/taskflow/:TypedTaskContext<TEvents>— narrowsctx.emit(name, payload)to adeclared event-name → payload-shape map so the SSE wire shape is
typed end-to-end.
TaskDefinition<TInput, TResult, TEvents>andTaskHandleRef/TaskRef— the registration object accepted bytaskflow.task(...)and the branded handle it returns. Phantomgeneric parameters propagate to
executeTaskso input / event mapsflow without redundant generics.
ExecuteTaskSettings— strictly disjoint fromPluginExecutionSettings.retry,cache,timeout,stream, anduserIdare typed asneverso any caller copying a settings object fromexecute()/executeStream()gets a clear compile-time error. TaskFlow handlesthose concerns natively (smart recovery, IK dedup, cooperative
stop()).step(fn)/step("name", fn)— workflow primitive with deferredbinding (resolves the engine wrapper on first invocation so plugin
authors can declare
const x = step(...)at module scope) and anexplicit guard against anonymous-arrow WAL key collisions.
setupSseHeaders,writeSseFrame,writeSseComment,RESERVED_BRIDGE_EVENT_NAMES,TASKFLOW_IK_HEADER— SSE wirehelpers with a CRLF guard on event names, multi-line `data:`
framing, and `Vary: Origin` for cache safety.
userContextFromTaskCtx(ctx)— reads the OBOUserContextfromthe FFI sidecar in
ctx.context, with a discriminator check(
isUserContext) so a stale or wrong-shape payload returns nullinstead of being trusted.
executeTask(lives intaskflow/execute-task.ts) implements thedurable bridge:
getCurrentUserContext()only (never fromrequest headers or settings) —
this.asUser(req).executeTask(...)yields OBO, the bare call yields SP.
taskflow.start(...)with the engine sidecar carryingthe OBO
UserContext, then bridges the WAL stream to SSE withLast-Event-IDreplay support clamped to[0, MAX_SAFE_INTEGER].custom:step:*events (WAL-only checkpoints), translatesheartbeatto SSE comments, and routes terminal events(
completed/failed/cancelled) before closing.bridge wire vocabulary (
ready,completed,failed,error,…) are dropped with
logger.warn, never closing the stream.cancelOnDisconnect(defaulttrue) +disconnectGraceMs(default 5 s) — short reconnects don't kill the durable run.
when
NODE_ENV=production.TaskflowServiceextensions intaskflow/index.ts:task<TInput, TResult, TEvents>(def)— typed registration witha hard-error on duplicate registration in production (silent
shadowing was the prior failure mode); HMR warning in dev.
hasTask(name)/getRegistration(name)— used byexecuteTaskto surface OBO-misconfiguration diagnostics.
_registerBridge(bridge)— bookkeeping for active SSE bridges soshutdown()can drain them with an explicitevent: error/server_shutting_downframe before the enginecloses iterators (otherwise the client sees mid-stream EOF).
Context layer:
getCurrentUserContext()— public accessor for the active OBOUserContext(sugar for the same AsyncLocalStorage slotisInUserContext()already reads).Plugin layer:
Plugin.executeTask(res, task, input, settings?)— typed entrypoint.
taskaccepts either a registered name (string) or abranded
TaskRef. The method is deliberately NOT inEXCLUDED_FROM_PROXYso OBO routing via theasUserproxy worksuniformly with
execute/executeStream.Tests scaffolding:
tools/test-helpers.ts—createStubTaskflowService(in-processfake of the TaskFlow surface for unit tests; consumed by PR 5's
analytics tests) and a robust
parseSSEResponseupgrade (multi-linedata:joining per the SSE spec, CRLF normalisation, commentskipping,
eventTypefiltering).Verify:
pnpm -r typecheck,pnpm build,pnpm test(122 files, 2279tests) all green.
pnpm exec knipclean (no unused exports or types).pnpm exec biome checkclean on touched files.Not in this PR. No plugin uses
executeTaskyet — analytics migrationis PR 5. No demo plugin — that's PR 6. No docs rewrite — that's PR 7.
Stacked on: stack/taskflow/taskflow-service (#376).
Signed-off-by: Victor Ditadi victor.ditadi@databricks.com
Signed-off-by: ditadi victordperd@gmail.com