Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions apps/cloud/src/services/execution-stack.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,26 +8,30 @@ import { env } from "cloudflare:workers";
import { Effect } from "effect";

import { createExecutionEngine } from "@executor-js/execution";
import { composeExecutionObservers } from "@executor-js/sdk";
import { makeDynamicWorkerExecutor } from "@executor-js/runtime-dynamic-worker";

import { withExecutionUsageTracking } from "../api/execution-usage";
import { AutumnService } from "./autumn";
import { createScopedExecutor } from "./executor";
import { createScopedExecutorBundle } from "./executor";

export const makeExecutionStack = (
userId: string,
organizationId: string,
organizationName: string,
) =>
Effect.gen(function* () {
const executor = yield* createScopedExecutor(userId, organizationId, organizationName).pipe(
Effect.withSpan("McpSessionDO.createScopedExecutor"),
);
const { executor, plugins } = yield* createScopedExecutorBundle(
userId,
organizationId,
organizationName,
).pipe(Effect.withSpan("McpSessionDO.createScopedExecutor"));
const codeExecutor = makeDynamicWorkerExecutor({ loader: env.LOADER });
const observer = composeExecutionObservers(plugins, executor);
const autumn = yield* AutumnService;
const engine = withExecutionUsageTracking(
organizationId,
createExecutionEngine({ executor, codeExecutor }),
createExecutionEngine({ executor, codeExecutor, observer }),
(orgId) => Effect.runFork(autumn.trackExecution(orgId)),
);
return { executor, engine };
Expand Down
14 changes: 12 additions & 2 deletions apps/cloud/src/services/executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ const orgPlugins = (): CloudPlugins =>
// on the outer scope.
// ---------------------------------------------------------------------------

export const createScopedExecutor = (
export const createScopedExecutorBundle = (
userId: string,
organizationId: string,
organizationName: string,
Expand Down Expand Up @@ -86,12 +86,22 @@ export const createScopedExecutor = (
// the opaque `InternalError({ traceId })` happens at the HTTP edge
// via `withCapture` (see `api/protected-layers.ts`). That's
// where `ErrorCaptureLive` (Sentry) gets wired in.
return yield* createExecutor({
const executor = yield* createExecutor({
scopes: [userOrgScope, orgScope],
adapter,
blobs,
plugins,
httpClientLayer,
onElicitation: "accept-all",
});
return { executor, plugins };
});

export const createScopedExecutor = (
userId: string,
organizationId: string,
organizationName: string,
) =>
createScopedExecutorBundle(userId, organizationId, organizationName).pipe(
Effect.map(({ executor }) => executor),
);
8 changes: 7 additions & 1 deletion apps/local/src/server/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { HttpRouter, HttpServer } from "effect/unstable/http";
import { Context, Effect, Layer, ManagedRuntime } from "effect";

import { observabilityMiddleware } from "@executor-js/api";
import { composeExecutionObservers } from "@executor-js/sdk";
import {
CoreHandlers,
ExecutorService,
Expand Down Expand Up @@ -61,7 +62,12 @@ const closeServerHandlers = async (handlers: ServerHandlers): Promise<void> => {

export const createServerHandlers = async (): Promise<ServerHandlers> => {
const { executor, plugins } = await getExecutorBundle();
const engine = createExecutionEngine({ executor, codeExecutor: makeQuickJsExecutor() });
const observer = composeExecutionObservers(plugins, executor);
const engine = createExecutionEngine({
executor,
codeExecutor: makeQuickJsExecutor(),
observer,
});

const LocalApi = composePluginApi(plugins);
// `ErrorCaptureLive` logs causes to the console and returns a short
Expand Down
211 changes: 209 additions & 2 deletions packages/core/execution/src/engine.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,15 @@
import { describe, expect, it } from "@effect/vitest";
import { Data, Effect, Exit } from "effect";
import { Data, Effect, Exit, Predicate } from "effect";

import { createExecutor, definePlugin, makeTestConfig } from "@executor-js/sdk";
import {
ElicitationResponse,
FormElicitation,
createExecutor,
definePlugin,
makeTestConfig,
type ExecutionEvent,
type ExecutionObserver,
} from "@executor-js/sdk";
import type { CodeExecutor, ExecuteResult } from "@executor-js/codemode-core";

import { createExecutionEngine } from "./engine";
Expand All @@ -27,13 +35,100 @@ const succeedingExecutor: CodeExecutor<FakeRuntimeError> = {
execute: () => Effect.succeed({ result: "ok", logs: [] } satisfies ExecuteResult),
};

const invokingExecutor: CodeExecutor<FakeRuntimeError> = {
execute: (_code, invoker) =>
Effect.gen(function* () {
const result = yield* invoker
.invoke({ path: "echo.ping", args: { message: "hello" } })
.pipe(Effect.orDie);
return { result, logs: ["called echo.ping"] } satisfies ExecuteResult;
}),
};

const elicitingExecutor: CodeExecutor<FakeRuntimeError> = {
execute: (_code, invoker) =>
Effect.gen(function* () {
const result = yield* invoker.invoke({ path: "forms.ask", args: {} }).pipe(Effect.orDie);
return { result, logs: [] } satisfies ExecuteResult;
}),
};

const emptyPlugin = definePlugin(() => ({
id: "empty-test" as const,
storage: () => ({}),
staticSources: () => [],
}));

const echoPlugin = definePlugin(() => ({
id: "echo-test" as const,
storage: () => ({}),
staticSources: () => [
{
id: "echo",
kind: "in-memory",
name: "Echo",
tools: [
{
name: "ping",
description: "Return the provided arguments",
handler: ({ args }) => Effect.succeed({ ok: true, args }),
},
],
},
],
}));

const formsPlugin = definePlugin(() => ({
id: "forms-test" as const,
storage: () => ({}),
staticSources: () => [
{
id: "forms",
kind: "in-memory",
name: "Forms",
tools: [
{
name: "ask",
description: "Ask for form input",
handler: ({ elicit }) =>
elicit(
FormElicitation.make({
message: "Approve this action",
requestedSchema: {},
}),
),
},
],
},
],
}));

const makeExecutor = () => createExecutor(makeTestConfig({ plugins: [emptyPlugin()] as const }));
const makeEchoExecutor = () => createExecutor(makeTestConfig({ plugins: [echoPlugin()] as const }));
const makeFormsExecutor = () =>
createExecutor(makeTestConfig({ plugins: [formsPlugin()] as const }));

const collectEvents = (): {
readonly events: ExecutionEvent[];
readonly observer: ExecutionObserver;
} => {
const events: ExecutionEvent[] = [];
return {
events,
observer: {
handle: (event) => Effect.sync(() => events.push(event)),
},
};
};

const eventKind = (event: ExecutionEvent): string => {
if (Predicate.isTagged(event, "ExecutionStarted")) return "ExecutionStarted";
if (Predicate.isTagged(event, "ToolCallStarted")) return "ToolCallStarted";
if (Predicate.isTagged(event, "ToolCallFinished")) return "ToolCallFinished";
if (Predicate.isTagged(event, "InteractionStarted")) return "InteractionStarted";
if (Predicate.isTagged(event, "InteractionResolved")) return "InteractionResolved";
return "ExecutionFinished";
};

describe("executeWithPause failure propagation", () => {
it.effect("surfaces a fast codeExecutor failure as an Exit.Failure", () =>
Expand Down Expand Up @@ -84,3 +179,115 @@ describe("executeWithPause failure propagation", () => {
}),
);
});

describe("execution observers", () => {
it.effect("emits ordered lifecycle events for a successful tool call", () =>
Effect.gen(function* () {
const executor = yield* makeEchoExecutor();
const { events, observer } = collectEvents();
const engine = createExecutionEngine({
executor,
codeExecutor: invokingExecutor,
observer,
});

const result = yield* engine.execute("call echo", {
onElicitation: () => Effect.succeed(ElicitationResponse.make({ action: "accept" })),
trigger: { kind: "test", metadata: { suite: "observer" } },
});

expect(result.error).toBeUndefined();
expect(events.map(eventKind)).toEqual([
"ExecutionStarted",
"ToolCallStarted",
"ToolCallFinished",
"ExecutionFinished",
]);
expect(events[0]).toMatchObject({
_tag: "ExecutionStarted",
code: "call echo",
trigger: { kind: "test", metadata: { suite: "observer" } },
});
expect(events[1]).toMatchObject({ _tag: "ToolCallStarted", path: "echo.ping" });
expect(events[2]).toMatchObject({
_tag: "ToolCallFinished",
path: "echo.ping",
status: "completed",
});
expect(events[3]).toMatchObject({ _tag: "ExecutionFinished", status: "completed" });
}),
);

it.effect("emits a failed terminal event when the code executor fails", () =>
Effect.gen(function* () {
const executor = yield* makeExecutor();
const { events, observer } = collectEvents();
const engine = createExecutionEngine({
executor,
codeExecutor: failingExecutor,
observer,
});

const exit = yield* Effect.exit(
engine.execute("bad code", {
onElicitation: () => Effect.succeed(ElicitationResponse.make({ action: "accept" })),
}),
);

expect(Exit.isFailure(exit)).toBe(true);
expect(events.map(eventKind)).toEqual(["ExecutionStarted", "ExecutionFinished"]);
expect(events[1]).toMatchObject({ _tag: "ExecutionFinished", status: "failed" });
}),
);

it.effect("does not fail execution when an observer fails", () =>
Effect.gen(function* () {
const executor = yield* makeExecutor();
const engine = createExecutionEngine({
executor,
codeExecutor: succeedingExecutor,
observer: {
handle: () => Effect.die("observer failed"),
},
});

const result = yield* engine.executeWithPause("noop");
expect(result.status).toBe("completed");
}),
);

it.effect("emits interaction events for pause and resume", () =>
Effect.gen(function* () {
const executor = yield* makeFormsExecutor();
const { events, observer } = collectEvents();
const engine = createExecutionEngine({
executor,
codeExecutor: elicitingExecutor,
observer,
});

const paused = yield* engine.executeWithPause("ask user");
expect(paused.status).toBe("paused");
if (paused.status !== "paused") {
return;
}

const completed = yield* engine.resume(paused.execution.id, {
action: "accept",
content: { approved: true },
});
expect(completed?.status).toBe("completed");

expect(events.map(eventKind)).toEqual([
"ExecutionStarted",
"ToolCallStarted",
"InteractionStarted",
"InteractionResolved",
"ToolCallFinished",
"ExecutionFinished",
]);
expect(events[2]).toMatchObject({ _tag: "InteractionStarted" });
expect(events[3]).toMatchObject({ _tag: "InteractionResolved", status: "accepted" });
}),
);
});
Loading
Loading