From af2918a0939e7c8f8fa83159de1382ff1fab8e8c Mon Sep 17 00:00:00 2001 From: Saatvik Arya Date: Wed, 13 May 2026 16:02:59 +0530 Subject: [PATCH] feat(execution): add plugin execution observers --- apps/cloud/src/services/execution-stack.ts | 14 +- apps/cloud/src/services/executor.ts | 14 +- apps/local/src/server/main.ts | 8 +- packages/core/execution/src/engine.test.ts | 211 ++++++++++++- packages/core/execution/src/engine.ts | 281 +++++++++++++++++- .../core/sdk/src/execution-observer.test.ts | 73 +++++ packages/core/sdk/src/execution-observer.ts | 130 ++++++++ packages/core/sdk/src/index.ts | 22 ++ packages/core/sdk/src/plugin.ts | 10 + 9 files changed, 737 insertions(+), 26 deletions(-) create mode 100644 packages/core/sdk/src/execution-observer.test.ts create mode 100644 packages/core/sdk/src/execution-observer.ts diff --git a/apps/cloud/src/services/execution-stack.ts b/apps/cloud/src/services/execution-stack.ts index d901416c1..0a18a7c5c 100644 --- a/apps/cloud/src/services/execution-stack.ts +++ b/apps/cloud/src/services/execution-stack.ts @@ -8,11 +8,12 @@ import { env } from "cloudflare:workers"; import { Effect } from "effect"; import { createExecutionEngine } from "@executor-js/execution"; +import { composeExecutionObservers } from "@executor-js/sdk"; import { makeDynamicWorkerExecutor } from "@executor-js/runtime-dynamic-worker"; import { withExecutionUsageTracking } from "../api/execution-usage"; import { AutumnService } from "./autumn"; -import { createScopedExecutor } from "./executor"; +import { createScopedExecutorBundle } from "./executor"; export const makeExecutionStack = ( userId: string, @@ -20,14 +21,17 @@ export const makeExecutionStack = ( organizationName: string, ) => Effect.gen(function* () { - const executor = yield* createScopedExecutor(userId, organizationId, organizationName).pipe( - Effect.withSpan("McpSessionDO.createScopedExecutor"), - ); + const { executor, plugins } = yield* createScopedExecutorBundle( + userId, + organizationId, + organizationName, + ).pipe(Effect.withSpan("McpSessionDO.createScopedExecutor")); const codeExecutor = makeDynamicWorkerExecutor({ loader: env.LOADER }); + const observer = composeExecutionObservers(plugins, executor); const autumn = yield* AutumnService; const engine = withExecutionUsageTracking( organizationId, - createExecutionEngine({ executor, codeExecutor }), + createExecutionEngine({ executor, codeExecutor, observer }), (orgId) => Effect.runFork(autumn.trackExecution(orgId)), ); return { executor, engine }; diff --git a/apps/cloud/src/services/executor.ts b/apps/cloud/src/services/executor.ts index d909bc0aa..6313e30f8 100644 --- a/apps/cloud/src/services/executor.ts +++ b/apps/cloud/src/services/executor.ts @@ -55,7 +55,7 @@ const orgPlugins = (): CloudPlugins => // on the outer scope. // --------------------------------------------------------------------------- -export const createScopedExecutor = ( +export const createScopedExecutorBundle = ( userId: string, organizationId: string, organizationName: string, @@ -86,7 +86,7 @@ export const createScopedExecutor = ( // the opaque `InternalError({ traceId })` happens at the HTTP edge // via `withCapture` (see `api/protected-layers.ts`). That's // where `ErrorCaptureLive` (Sentry) gets wired in. - return yield* createExecutor({ + const executor = yield* createExecutor({ scopes: [userOrgScope, orgScope], adapter, blobs, @@ -94,4 +94,14 @@ export const createScopedExecutor = ( httpClientLayer, onElicitation: "accept-all", }); + return { executor, plugins }; }); + +export const createScopedExecutor = ( + userId: string, + organizationId: string, + organizationName: string, +) => + createScopedExecutorBundle(userId, organizationId, organizationName).pipe( + Effect.map(({ executor }) => executor), + ); diff --git a/apps/local/src/server/main.ts b/apps/local/src/server/main.ts index 6644c4785..1f8e51786 100644 --- a/apps/local/src/server/main.ts +++ b/apps/local/src/server/main.ts @@ -3,6 +3,7 @@ import { HttpRouter, HttpServer } from "effect/unstable/http"; import { Context, Effect, Layer, ManagedRuntime } from "effect"; import { observabilityMiddleware } from "@executor-js/api"; +import { composeExecutionObservers } from "@executor-js/sdk"; import { CoreHandlers, ExecutorService, @@ -61,7 +62,12 @@ const closeServerHandlers = async (handlers: ServerHandlers): Promise => { export const createServerHandlers = async (): Promise => { const { executor, plugins } = await getExecutorBundle(); - const engine = createExecutionEngine({ executor, codeExecutor: makeQuickJsExecutor() }); + const observer = composeExecutionObservers(plugins, executor); + const engine = createExecutionEngine({ + executor, + codeExecutor: makeQuickJsExecutor(), + observer, + }); const LocalApi = composePluginApi(plugins); // `ErrorCaptureLive` logs causes to the console and returns a short diff --git a/packages/core/execution/src/engine.test.ts b/packages/core/execution/src/engine.test.ts index d73d513b6..37e0681f4 100644 --- a/packages/core/execution/src/engine.test.ts +++ b/packages/core/execution/src/engine.test.ts @@ -1,7 +1,15 @@ import { describe, expect, it } from "@effect/vitest"; -import { Data, Effect, Exit } from "effect"; +import { Data, Effect, Exit, Predicate } from "effect"; -import { createExecutor, definePlugin, makeTestConfig } from "@executor-js/sdk"; +import { + ElicitationResponse, + FormElicitation, + createExecutor, + definePlugin, + makeTestConfig, + type ExecutionEvent, + type ExecutionObserver, +} from "@executor-js/sdk"; import type { CodeExecutor, ExecuteResult } from "@executor-js/codemode-core"; import { createExecutionEngine } from "./engine"; @@ -27,13 +35,100 @@ const succeedingExecutor: CodeExecutor = { execute: () => Effect.succeed({ result: "ok", logs: [] } satisfies ExecuteResult), }; +const invokingExecutor: CodeExecutor = { + execute: (_code, invoker) => + Effect.gen(function* () { + const result = yield* invoker + .invoke({ path: "echo.ping", args: { message: "hello" } }) + .pipe(Effect.orDie); + return { result, logs: ["called echo.ping"] } satisfies ExecuteResult; + }), +}; + +const elicitingExecutor: CodeExecutor = { + execute: (_code, invoker) => + Effect.gen(function* () { + const result = yield* invoker.invoke({ path: "forms.ask", args: {} }).pipe(Effect.orDie); + return { result, logs: [] } satisfies ExecuteResult; + }), +}; + const emptyPlugin = definePlugin(() => ({ id: "empty-test" as const, storage: () => ({}), staticSources: () => [], })); +const echoPlugin = definePlugin(() => ({ + id: "echo-test" as const, + storage: () => ({}), + staticSources: () => [ + { + id: "echo", + kind: "in-memory", + name: "Echo", + tools: [ + { + name: "ping", + description: "Return the provided arguments", + handler: ({ args }) => Effect.succeed({ ok: true, args }), + }, + ], + }, + ], +})); + +const formsPlugin = definePlugin(() => ({ + id: "forms-test" as const, + storage: () => ({}), + staticSources: () => [ + { + id: "forms", + kind: "in-memory", + name: "Forms", + tools: [ + { + name: "ask", + description: "Ask for form input", + handler: ({ elicit }) => + elicit( + FormElicitation.make({ + message: "Approve this action", + requestedSchema: {}, + }), + ), + }, + ], + }, + ], +})); + const makeExecutor = () => createExecutor(makeTestConfig({ plugins: [emptyPlugin()] as const })); +const makeEchoExecutor = () => createExecutor(makeTestConfig({ plugins: [echoPlugin()] as const })); +const makeFormsExecutor = () => + createExecutor(makeTestConfig({ plugins: [formsPlugin()] as const })); + +const collectEvents = (): { + readonly events: ExecutionEvent[]; + readonly observer: ExecutionObserver; +} => { + const events: ExecutionEvent[] = []; + return { + events, + observer: { + handle: (event) => Effect.sync(() => events.push(event)), + }, + }; +}; + +const eventKind = (event: ExecutionEvent): string => { + if (Predicate.isTagged(event, "ExecutionStarted")) return "ExecutionStarted"; + if (Predicate.isTagged(event, "ToolCallStarted")) return "ToolCallStarted"; + if (Predicate.isTagged(event, "ToolCallFinished")) return "ToolCallFinished"; + if (Predicate.isTagged(event, "InteractionStarted")) return "InteractionStarted"; + if (Predicate.isTagged(event, "InteractionResolved")) return "InteractionResolved"; + return "ExecutionFinished"; +}; describe("executeWithPause failure propagation", () => { it.effect("surfaces a fast codeExecutor failure as an Exit.Failure", () => @@ -84,3 +179,115 @@ describe("executeWithPause failure propagation", () => { }), ); }); + +describe("execution observers", () => { + it.effect("emits ordered lifecycle events for a successful tool call", () => + Effect.gen(function* () { + const executor = yield* makeEchoExecutor(); + const { events, observer } = collectEvents(); + const engine = createExecutionEngine({ + executor, + codeExecutor: invokingExecutor, + observer, + }); + + const result = yield* engine.execute("call echo", { + onElicitation: () => Effect.succeed(ElicitationResponse.make({ action: "accept" })), + trigger: { kind: "test", metadata: { suite: "observer" } }, + }); + + expect(result.error).toBeUndefined(); + expect(events.map(eventKind)).toEqual([ + "ExecutionStarted", + "ToolCallStarted", + "ToolCallFinished", + "ExecutionFinished", + ]); + expect(events[0]).toMatchObject({ + _tag: "ExecutionStarted", + code: "call echo", + trigger: { kind: "test", metadata: { suite: "observer" } }, + }); + expect(events[1]).toMatchObject({ _tag: "ToolCallStarted", path: "echo.ping" }); + expect(events[2]).toMatchObject({ + _tag: "ToolCallFinished", + path: "echo.ping", + status: "completed", + }); + expect(events[3]).toMatchObject({ _tag: "ExecutionFinished", status: "completed" }); + }), + ); + + it.effect("emits a failed terminal event when the code executor fails", () => + Effect.gen(function* () { + const executor = yield* makeExecutor(); + const { events, observer } = collectEvents(); + const engine = createExecutionEngine({ + executor, + codeExecutor: failingExecutor, + observer, + }); + + const exit = yield* Effect.exit( + engine.execute("bad code", { + onElicitation: () => Effect.succeed(ElicitationResponse.make({ action: "accept" })), + }), + ); + + expect(Exit.isFailure(exit)).toBe(true); + expect(events.map(eventKind)).toEqual(["ExecutionStarted", "ExecutionFinished"]); + expect(events[1]).toMatchObject({ _tag: "ExecutionFinished", status: "failed" }); + }), + ); + + it.effect("does not fail execution when an observer fails", () => + Effect.gen(function* () { + const executor = yield* makeExecutor(); + const engine = createExecutionEngine({ + executor, + codeExecutor: succeedingExecutor, + observer: { + handle: () => Effect.die("observer failed"), + }, + }); + + const result = yield* engine.executeWithPause("noop"); + expect(result.status).toBe("completed"); + }), + ); + + it.effect("emits interaction events for pause and resume", () => + Effect.gen(function* () { + const executor = yield* makeFormsExecutor(); + const { events, observer } = collectEvents(); + const engine = createExecutionEngine({ + executor, + codeExecutor: elicitingExecutor, + observer, + }); + + const paused = yield* engine.executeWithPause("ask user"); + expect(paused.status).toBe("paused"); + if (paused.status !== "paused") { + return; + } + + const completed = yield* engine.resume(paused.execution.id, { + action: "accept", + content: { approved: true }, + }); + expect(completed?.status).toBe("completed"); + + expect(events.map(eventKind)).toEqual([ + "ExecutionStarted", + "ToolCallStarted", + "InteractionStarted", + "InteractionResolved", + "ToolCallFinished", + "ExecutionFinished", + ]); + expect(events[2]).toMatchObject({ _tag: "InteractionStarted" }); + expect(events[3]).toMatchObject({ _tag: "InteractionResolved", status: "accepted" }); + }), + ); +}); diff --git a/packages/core/execution/src/engine.ts b/packages/core/execution/src/engine.ts index 793a1ded4..1d701a34f 100644 --- a/packages/core/execution/src/engine.ts +++ b/packages/core/execution/src/engine.ts @@ -1,5 +1,5 @@ import { Deferred, Effect, Fiber, Predicate, Ref } from "effect"; -import type * as Cause from "effect/Cause"; +import * as Cause from "effect/Cause"; import type { Executor, @@ -7,6 +7,23 @@ import type { ElicitationResponse, ElicitationHandler, ElicitationContext, + ExecutionObserver, + ExecutionTrigger, + ScopeId, +} from "@executor-js/sdk/core"; +import { + ExecutionFinished, + ExecutionId, + ExecutionInteractionId, + ExecutionStarted, + ExecutionToolCallId, + InteractionResolved, + InteractionStarted, + ScopeId as ScopeIdSchema, + ToolCallFinished, + ToolCallStarted, + ignoreExecutionObserverErrors, + noopExecutionObserver, } from "@executor-js/sdk/core"; import { CodeExecutionError } from "@executor-js/codemode-core"; import type { CodeExecutor, ExecuteResult, SandboxToolInvoker } from "@executor-js/codemode-core"; @@ -27,6 +44,7 @@ import { buildExecuteDescription } from "./description"; export type ExecutionEngineConfig = { readonly executor: Executor; readonly codeExecutor: CodeExecutor; + readonly observer?: ExecutionObserver; }; export type ExecutionResult = @@ -50,6 +68,14 @@ export type ResumeResponse = { readonly content?: Record; }; +export type ExecutionRunOptions = { + readonly trigger?: ExecutionTrigger; +}; + +export type InlineExecutionOptions = ExecutionRunOptions & { + readonly onElicitation: ElicitationHandler; +}; + // --------------------------------------------------------------------------- // Result formatting // --------------------------------------------------------------------------- @@ -300,6 +326,23 @@ const makeFullInvoker = (executor: Executor, invokeOptions: InvokeOptions): Sand }; }; +const makeExecutionId = (): ExecutionId => ExecutionId.make(`exec_${crypto.randomUUID()}`); +const makeToolCallId = (): ExecutionToolCallId => + ExecutionToolCallId.make(`tool_${crypto.randomUUID()}`); +const makeInteractionId = (): ExecutionInteractionId => + ExecutionInteractionId.make(`interaction_${crypto.randomUUID()}`); + +const causeToMessage = (cause: Cause.Cause): string => Cause.pretty(cause); + +const responseToInteractionStatus = ( + response: typeof ElicitationResponse.Type, +): "accepted" | "declined" | "cancelled" => + response.action === "accept" + ? "accepted" + : response.action === "decline" + ? "declined" + : "cancelled"; + // --------------------------------------------------------------------------- // Execution Engine // --------------------------------------------------------------------------- @@ -315,7 +358,7 @@ export type ExecutionEngine */ readonly execute: ( code: string, - options: { readonly onElicitation: ElicitationHandler }, + options: InlineExecutionOptions, ) => Effect.Effect; /** @@ -323,7 +366,10 @@ export type ExecutionEngine * Use this when the host doesn't support inline elicitation. * Returns either a completed result or a paused execution that can be resumed. */ - readonly executeWithPause: (code: string) => Effect.Effect; + readonly executeWithPause: ( + code: string, + options?: ExecutionRunOptions, + ) => Effect.Effect; /** * Resume a paused execution. Returns a completed result, a new pause, or @@ -344,8 +390,137 @@ export const createExecutionEngine = , ): ExecutionEngine => { const { executor, codeExecutor } = config; + const observer = ignoreExecutionObserverErrors(config.observer ?? noopExecutionObserver); const pausedExecutions = new Map>(); - let nextId = 0; + const scopeId = executor.scopes[0]?.id ?? ScopeIdSchema.make("unknown"); + + const emit = observer.handle; + + const observeToolCalls = ( + invoker: SandboxToolInvoker, + executionId: ExecutionId, + currentScopeId: ScopeId, + ): SandboxToolInvoker => ({ + invoke: ({ path, args }) => { + const toolCallId = makeToolCallId(); + return Effect.gen(function* () { + yield* emit( + new ToolCallStarted({ + executionId, + toolCallId, + scopeId: currentScopeId, + path, + args, + startedAt: new Date(), + }), + ); + + return yield* invoker.invoke({ path, args }).pipe( + Effect.tap((result) => + emit( + new ToolCallFinished({ + executionId, + toolCallId, + scopeId: currentScopeId, + path, + status: "completed", + result, + completedAt: new Date(), + }), + ), + ), + Effect.tapCause((cause) => + emit( + new ToolCallFinished({ + executionId, + toolCallId, + scopeId: currentScopeId, + path, + status: "failed", + error: causeToMessage(cause), + completedAt: new Date(), + }), + ), + ), + ); + }); + }, + }); + + const finishFromResult = ( + executionId: ExecutionId, + currentScopeId: ScopeId, + result: ExecuteResult, + ) => + new ExecutionFinished({ + executionId, + scopeId: currentScopeId, + status: result.error ? "failed" : "completed", + result: result.result, + error: result.error, + logs: result.logs, + completedAt: new Date(), + }); + + const finishFromCause = ( + executionId: ExecutionId, + currentScopeId: ScopeId, + cause: Cause.Cause, + ) => + new ExecutionFinished({ + executionId, + scopeId: currentScopeId, + status: "failed", + error: causeToMessage(cause), + completedAt: new Date(), + }); + + const observeInlineElicitation = ( + executionId: ExecutionId, + currentScopeId: ScopeId, + handler: ElicitationHandler, + ): ElicitationHandler => { + return (ctx) => { + const interactionId = makeInteractionId(); + return Effect.gen(function* () { + yield* emit( + new InteractionStarted({ + executionId, + interactionId, + scopeId: currentScopeId, + context: ctx, + startedAt: new Date(), + }), + ); + return yield* handler(ctx).pipe( + Effect.tap((response) => + emit( + new InteractionResolved({ + executionId, + interactionId, + scopeId: currentScopeId, + status: responseToInteractionStatus(response), + response, + completedAt: new Date(), + }), + ), + ), + Effect.tapCause((cause) => + emit( + new InteractionResolved({ + executionId, + interactionId, + scopeId: currentScopeId, + status: "failed", + error: causeToMessage(cause), + completedAt: new Date(), + }), + ), + ), + ); + }); + }; + }; /** * Race a running fiber against a pause signal. Returns when either @@ -379,12 +554,26 @@ export const createExecutionEngine = Effect.gen(function* () { const responseDeferred = yield* Deferred.make(); - const id = `exec_${++nextId}`; + const interactionId = makeInteractionId(); const paused: InternalPausedExecution = { - id, + id: executionId, elicitationContext: ctx, response: responseDeferred, fiber: fiber!, pauseSignalRef, }; - pausedExecutions.set(id, paused); + pausedExecutions.set(executionId, paused); const currentSignal = yield* Ref.get(pauseSignalRef); + yield* emit( + new InteractionStarted({ + executionId, + interactionId, + scopeId, + context: ctx, + startedAt: new Date(), + }), + ); yield* Deferred.succeed(currentSignal, paused); // Suspend until resume() completes responseDeferred. - return yield* Deferred.await(responseDeferred); + return yield* Deferred.await(responseDeferred).pipe( + Effect.tap((response) => + emit( + new InteractionResolved({ + executionId, + interactionId, + scopeId, + status: responseToInteractionStatus(response), + response, + completedAt: new Date(), + }), + ), + ), + Effect.tapCause((cause) => + emit( + new InteractionResolved({ + executionId, + interactionId, + scopeId, + status: "failed", + error: causeToMessage(cause), + completedAt: new Date(), + }), + ), + ), + ); }); - const invoker = makeFullInvoker(executor, { onElicitation: elicitationHandler }); + const invoker = observeToolCalls( + makeFullInvoker(executor, { onElicitation: elicitationHandler }), + executionId, + scopeId, + ); fiber = yield* Effect.forkDetach( - codeExecutor.execute(code, invoker).pipe(Effect.withSpan("executor.code.exec")), + codeExecutor.execute(code, invoker).pipe( + Effect.tap((result) => emit(finishFromResult(executionId, scopeId, result))), + Effect.tapCause((cause) => emit(finishFromCause(executionId, scopeId, cause))), + Effect.withSpan("executor.code.exec"), + ), ); const initialSignal = yield* Ref.get(pauseSignalRef); @@ -459,16 +690,34 @@ export const createExecutionEngine = emit(finishFromResult(executionId, scopeId, result))), + Effect.tapCause((cause) => emit(finishFromCause(executionId, scopeId, cause))), + Effect.withSpan("executor.code.exec"), + ); }); return { diff --git a/packages/core/sdk/src/execution-observer.test.ts b/packages/core/sdk/src/execution-observer.test.ts new file mode 100644 index 000000000..6367cd907 --- /dev/null +++ b/packages/core/sdk/src/execution-observer.test.ts @@ -0,0 +1,73 @@ +import { describe, expect, it } from "@effect/vitest"; +import { Effect } from "effect"; + +import { + ExecutionFinished, + ExecutionId, + ScopeId, + composeExecutionObservers, + definePlugin, +} from "./index"; + +const firstPlugin = definePlugin(() => ({ + id: "first" as const, + storage: () => ({}), + extension: () => ({ label: "first" }), + runtime: { + executionObserver: (self) => ({ + handle: () => Effect.sync(() => calls.push(self.label)), + }), + }, +})); + +const failingPlugin = definePlugin(() => ({ + id: "failing" as const, + storage: () => ({}), + extension: () => ({ label: "failing" }), + runtime: { + executionObserver: () => ({ + handle: () => Effect.die("observer failed"), + }), + }, +})); + +const lastPlugin = definePlugin(() => ({ + id: "last" as const, + storage: () => ({}), + extension: () => ({ label: "last" }), + runtime: { + executionObserver: (self) => ({ + handle: () => Effect.sync(() => calls.push(self.label)), + }), + }, +})); + +let calls: string[] = []; + +describe("composeExecutionObservers", () => { + it.effect("composes plugin observers in order and isolates observer failures", () => + Effect.gen(function* () { + calls = []; + const first = firstPlugin(); + const failing = failingPlugin(); + const last = lastPlugin(); + const observer = composeExecutionObservers([first, failing, last] as const, { + first: { label: "first" }, + failing: { label: "failing" }, + last: { label: "last" }, + }); + + yield* observer.handle( + new ExecutionFinished({ + executionId: ExecutionId.make("exec_test"), + scopeId: ScopeId.make("scope_test"), + status: "completed", + result: "ok", + completedAt: new Date(), + }), + ); + + expect(calls).toEqual(["first", "last"]); + }), + ); +}); diff --git a/packages/core/sdk/src/execution-observer.ts b/packages/core/sdk/src/execution-observer.ts new file mode 100644 index 000000000..da1b51160 --- /dev/null +++ b/packages/core/sdk/src/execution-observer.ts @@ -0,0 +1,130 @@ +import { Data, Effect, Schema } from "effect"; + +import type { ElicitationContext, ElicitationResponse } from "./elicitation"; +import { ScopeId } from "./ids"; +import type { AnyPlugin, PluginExtensions } from "./plugin"; + +export const ExecutionId = Schema.String.pipe(Schema.brand("ExecutionId")); +export type ExecutionId = typeof ExecutionId.Type; + +export const ExecutionToolCallId = Schema.String.pipe(Schema.brand("ExecutionToolCallId")); +export type ExecutionToolCallId = typeof ExecutionToolCallId.Type; + +export const ExecutionInteractionId = Schema.String.pipe(Schema.brand("ExecutionInteractionId")); +export type ExecutionInteractionId = typeof ExecutionInteractionId.Type; + +export type ExecutionTrigger = { + readonly kind: string; + readonly metadata?: Record; +}; + +export type ToolCallStatus = "completed" | "failed"; +export type InteractionStatus = "accepted" | "declined" | "cancelled" | "failed"; +export type ExecutionStatus = "completed" | "failed"; + +export class ExecutionStarted extends Data.TaggedClass("ExecutionStarted")<{ + readonly executionId: ExecutionId; + readonly scopeId: ScopeId; + readonly code: string; + readonly trigger?: ExecutionTrigger; + readonly startedAt: Date; +}> {} + +export class ToolCallStarted extends Data.TaggedClass("ToolCallStarted")<{ + readonly executionId: ExecutionId; + readonly toolCallId: ExecutionToolCallId; + readonly scopeId: ScopeId; + readonly path: string; + readonly args: unknown; + readonly startedAt: Date; +}> {} + +export class ToolCallFinished extends Data.TaggedClass("ToolCallFinished")<{ + readonly executionId: ExecutionId; + readonly toolCallId: ExecutionToolCallId; + readonly scopeId: ScopeId; + readonly path: string; + readonly status: ToolCallStatus; + readonly result?: unknown; + readonly error?: string; + readonly completedAt: Date; +}> {} + +export class InteractionStarted extends Data.TaggedClass("InteractionStarted")<{ + readonly executionId: ExecutionId; + readonly interactionId: ExecutionInteractionId; + readonly scopeId: ScopeId; + readonly context: ElicitationContext; + readonly startedAt: Date; +}> {} + +export class InteractionResolved extends Data.TaggedClass("InteractionResolved")<{ + readonly executionId: ExecutionId; + readonly interactionId: ExecutionInteractionId; + readonly scopeId: ScopeId; + readonly status: InteractionStatus; + readonly response?: ElicitationResponse; + readonly error?: string; + readonly completedAt: Date; +}> {} + +export class ExecutionFinished extends Data.TaggedClass("ExecutionFinished")<{ + readonly executionId: ExecutionId; + readonly scopeId: ScopeId; + readonly status: ExecutionStatus; + readonly result?: unknown; + readonly error?: string; + readonly logs?: readonly string[]; + readonly completedAt: Date; +}> {} + +export type ExecutionEvent = + | ExecutionStarted + | ToolCallStarted + | ToolCallFinished + | InteractionStarted + | InteractionResolved + | ExecutionFinished; + +export interface ExecutionObserver { + readonly handle: (event: ExecutionEvent) => Effect.Effect; +} + +export const noopExecutionObserver: ExecutionObserver = { + handle: () => Effect.void, +}; + +export const ignoreExecutionObserverErrors = ( + observer: ExecutionObserver, +): ExecutionObserver => ({ + handle: (event) => observer.handle(event).pipe(Effect.catchCause(() => Effect.void)), +}); + +export const composeExecutionObservers = ( + plugins: TPlugins, + extensions: PluginExtensions, +): ExecutionObserver => { + const observers: ExecutionObserver[] = []; + + for (const plugin of plugins) { + const observer = plugin.runtime?.executionObserver?.( + extensions[plugin.id as keyof PluginExtensions] as never, + ); + if (observer) { + observers.push(observer); + } + } + + if (observers.length === 0) { + return noopExecutionObserver; + } + + return { + handle: (event) => + Effect.forEach( + observers, + (observer) => observer.handle(event).pipe(Effect.catchCause(() => Effect.void)), + { discard: true }, + ), + }; +}; diff --git a/packages/core/sdk/src/index.ts b/packages/core/sdk/src/index.ts index e84082501..8ab71e56f 100644 --- a/packages/core/sdk/src/index.ts +++ b/packages/core/sdk/src/index.ts @@ -176,6 +176,28 @@ export { type ElicitationContext, } from "./elicitation"; +// Execution observer +export { + ExecutionId, + ExecutionToolCallId, + ExecutionInteractionId, + ExecutionStarted, + ToolCallStarted, + ToolCallFinished, + InteractionStarted, + InteractionResolved, + ExecutionFinished, + noopExecutionObserver, + ignoreExecutionObserverErrors, + composeExecutionObservers, + type ExecutionTrigger, + type ToolCallStatus, + type InteractionStatus, + type ExecutionStatus, + type ExecutionEvent, + type ExecutionObserver, +} from "./execution-observer"; + // Blob store export { type BlobStore, diff --git a/packages/core/sdk/src/plugin.ts b/packages/core/sdk/src/plugin.ts index 6c972c6a4..d2996b068 100644 --- a/packages/core/sdk/src/plugin.ts +++ b/packages/core/sdk/src/plugin.ts @@ -23,6 +23,7 @@ import type { ElicitationRequest, ElicitationResponse, } from "./elicitation"; +import type { ExecutionObserver } from "./execution-observer"; import type { ConnectionInUseError, ConnectionNotFoundError, @@ -449,6 +450,15 @@ export interface PluginSpec< * `({ args }) => self.addSpec(args)`. */ readonly staticSources?: (self: NoInfer) => readonly StaticSourceDecl[]; + /** Runtime hooks contributed by this plugin. These sit outside the + * extension API because the host composes them into process/runtime + * behavior instead of exposing them to end-user code. */ + readonly runtime?: { + /** Observe execution lifecycle events. Observer failures are captured by + * the composition helper and never fail the user execution. */ + readonly executionObserver?: (self: NoInfer) => ExecutionObserver; + }; + /** HttpApiGroup contributed by this plugin. Composed into the host's * `HttpApi` via the `addGroup` helper at runtime. The host mounts * the group at `/_executor/plugins/{id}/...` (or wherever the