Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions packages/appkit/src/context/execution-context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,20 @@ export function getWorkspaceId(): Promise<string> {
}

/**
* Check if currently running in a user context.
* Check if currently running in a user context. Sugar for
* `getCurrentUserContext() !== null` β€” prefer {@link getCurrentUserContext}
* when you also need the value.
*/
export function isInUserContext(): boolean {
const ctx = executionContextStorage.getStore();
return ctx !== undefined;
return getCurrentUserContext() !== null;
}

/**
* Returns the active {@link UserContext} when inside a
* `runInUserContext` scope (set by `plugin.asUser(req).method(...)`),
* or `null`. Use to forward user identity to a downstream layer
* (e.g. spawning a durable task that should run as the caller).
*/
export function getCurrentUserContext(): UserContext | null {
return executionContextStorage.getStore() ?? null;
}
1 change: 1 addition & 0 deletions packages/appkit/src/context/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
export {
getCurrentUserContext,
getCurrentUserId,
getExecutionContext,
getWarehouseId,
Expand Down
23 changes: 23 additions & 0 deletions packages/appkit/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,29 @@ export {
ResourceRegistry,
ResourceType,
} from "./registry";
// TaskFlow durable execution. `TaskflowService` is exported type-only:
// it's constructed by `createApp` and addressed via `this.taskflow`
// inside plugins; internal statics must not leak to consumers.
export {
type ExecuteTaskSettings,
type ResumeOptions,
type SseEvent,
type StopOptions,
type StreamEvent,
type SubmitOptions,
setupSseHeaders,
step,
TASKFLOW_IK_HEADER,
type Task,
type TaskContext,
type TaskDefinition,
type TaskEvent,
type TaskflowConfig,
type TaskflowService,
type TaskHandle,
type TypedTaskContext,
writeSseFrame,
} from "./taskflow";
// Telemetry (for advanced custom telemetry)
export {
type Counter,
Expand Down
67 changes: 59 additions & 8 deletions packages/appkit/src/plugin/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,12 @@ import type { PluginContext } from "../core/plugin-context";
import { AppKitError, AuthenticationError } from "../errors";
import { createLogger } from "../logging/logger";
import { StreamManager } from "../stream";
import { TaskflowService } from "../taskflow";
import {
type ExecuteTaskSettings,
TaskflowService,
type TaskRef,
} from "../taskflow";
import { executeTask as executeTaskImpl } from "../taskflow/execute-task";
import {
type ITelemetry,
normalizeTelemetryOptions,
Expand Down Expand Up @@ -74,11 +79,14 @@ function hasHttpStatusCode(

/**
* Methods that should not be proxied by asUser().
* These are lifecycle/internal methods that don't make sense
* to execute in a user context.
* Lifecycle/internal methods that don't make sense in a user context.
*
* Note: `executeTask` is deliberately NOT excluded β€” it MUST run inside
* the proxy's `runInUserContext` so `getCurrentUserContext()` forwards
* the OBO context to the engine. Excluding it would silently downgrade
* OBO calls to SP.
*/
const EXCLUDED_FROM_PROXY = new Set([
// Lifecycle methods
"setup",
"shutdown",
"attachContext",
Expand All @@ -87,12 +95,10 @@ const EXCLUDED_FROM_PROXY = new Set([
"getSkipBodyParsingPaths",
"abortActiveOperations",
"clientConfig",
// asUser itself - prevent chaining like .asUser().asUser()
// Prevent chained .asUser().asUser().
"asUser",
// Internal methods
"constructor",
// Synchronous accessor for the shared singleton β€” no need for an
// extra async-context frame.
// Synchronous singleton accessor; no need for an async-context frame.
"requireTaskflow",
]);

Expand Down Expand Up @@ -560,6 +566,51 @@ export abstract class Plugin<
);
}

/**
* Bridges a registered durable task to an SSE response.
*
* Submits via `taskflow.start(...)`, subscribes, writes each event as
* `id: <seq>\nevent: <name>\ndata: <json>`. Supports `Last-Event-ID`
* reconnect from the WAL.
*
* Identity comes from the active `runInUserContext` scope. For OBO,
* call through the proxy: `this.asUser(req).executeTask(...)`.
*
* Unlike `execute()` / `executeStream()`, this does not accept
* `retry` / `cache` / `timeout` β€” TaskFlow handles them natively.
*
* @example
* ```ts
* // SP
* await this.executeTask(res, "agent-loop", req.body);
*
* // OBO, no cancel on reconnect
* await this.asUser(req).executeTask(res, "agent-loop", req.body, {
* cancelOnDisconnect: false,
* });
* ```
*/
protected async executeTask<
TInput = unknown,
TResult = unknown,
TEvents extends Record<string, unknown> = Record<string, unknown>,
>(
res: express.Response,
task: string | TaskRef<TInput, TResult, TEvents>,
input: TInput,
settings?: ExecuteTaskSettings,
): Promise<void> {
const taskflow = this.requireTaskflow();
const taskName = typeof task === "string" ? task : task.name;
return executeTaskImpl(
{ taskflow, telemetry: this.telemetry, pluginName: this.name },
res,
taskName,
input,
settings,
);
}

/**
* Execute a function with the plugin's interceptor chain.
*
Expand Down
Loading