Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,6 @@ coverage
.turbo

.databricks

# TaskFlow durable storage (SQLite + WAL); per-machine, never checked in.
.appkit/
5 changes: 4 additions & 1 deletion apps/dev-playground/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,7 @@ test-results/
playwright-report/

# Auto-generated types (endpoint-specific, varies per developer)
shared/appkit-types/serving.d.ts
shared/appkit-types/serving.d.ts

# TaskFlow durable storage (SQLite + WAL); per-machine, never checked in.
.appkit/
1 change: 1 addition & 0 deletions knip.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"**/*.generated.ts",
"**/*.example.tsx",
"**/*.css",
"packages/appkit/vendor/**",
"packages/appkit/src/plugins/vector-search/**",
"packages/appkit/src/plugin/index.ts",
"packages/appkit/src/plugin/to-plugin.ts",
Expand Down
4 changes: 4 additions & 0 deletions packages/appkit/src/core/appkit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import {
TelemetryReporter,
} from "../internal-telemetry";
import { ResourceRegistry, ResourceType } from "../registry";
import type { TaskflowConfig } from "../taskflow";
import type { TelemetryConfig } from "../telemetry";
import { isToolProvider, PluginContext } from "./plugin-context";
import { type ServiceManager, startCoreServices } from "./service-manager";
Expand Down Expand Up @@ -193,6 +194,7 @@ export class AppKit<TPlugins extends InputPluginMap> {
plugins?: T;
telemetry?: TelemetryConfig;
cache?: CacheConfig;
taskflow?: TaskflowConfig | false;
client?: WorkspaceClient;
onPluginsReady?: (appkit: PluginMap<T>) => void | Promise<void>;
disableInternalTelemetry?: boolean;
Expand All @@ -201,6 +203,7 @@ export class AppKit<TPlugins extends InputPluginMap> {
const services = await startCoreServices({
telemetry: config?.telemetry,
cache: config?.cache,
taskflow: config?.taskflow,
});

try {
Expand Down Expand Up @@ -322,6 +325,7 @@ export async function createApp<
plugins?: T;
telemetry?: TelemetryConfig;
cache?: CacheConfig;
taskflow?: TaskflowConfig | false;
client?: WorkspaceClient;
onPluginsReady?: (appkit: PluginMap<T>) => void | Promise<void>;
disableInternalTelemetry?: boolean;
Expand Down
3 changes: 3 additions & 0 deletions packages/appkit/src/core/service-manager.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import type { CacheConfig } from "shared";
import { CacheManager } from "../cache";
import { createLogger } from "../logging";
import { type TaskflowConfig, TaskflowService } from "../taskflow";
import { type TelemetryConfig, TelemetryManager } from "../telemetry";

const logger = createLogger("services");
Expand Down Expand Up @@ -55,11 +56,13 @@ export class ServiceManager {
export async function startCoreServices(config: {
telemetry?: TelemetryConfig;
cache?: CacheConfig;
taskflow?: TaskflowConfig | false;
}): Promise<ServiceManager> {
const services = new ServiceManager();
try {
services.add("telemetry", await TelemetryManager.boot(config.telemetry));
services.add("cache", await CacheManager.boot(config.cache));
services.add("taskflow", await TaskflowService.boot(config.taskflow));
return services;
} catch (error) {
await services.stop();
Expand Down
31 changes: 30 additions & 1 deletion packages/appkit/src/plugin/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import type { PluginContext } from "../core/plugin-context";
import { AppKitError, AuthenticationError } from "../errors";
import { createLogger } from "../logging/logger";
import { StreamManager } from "../stream";
import { TaskflowService } from "../taskflow";
import {
type ITelemetry,
normalizeTelemetryOptions,
Expand Down Expand Up @@ -90,6 +91,9 @@ const EXCLUDED_FROM_PROXY = new Set([
"asUser",
// Internal methods
"constructor",
// Synchronous accessor for the shared singleton — no need for an
// extra async-context frame.
"requireTaskflow",
]);

/**
Expand Down Expand Up @@ -187,6 +191,13 @@ export abstract class Plugin<
protected telemetry!: ITelemetry;
protected context?: PluginContext;

/**
* Durable execution service. `null` when the app opted out via
* `createApp({ taskflow: false })`. Use {@link requireTaskflow} when
* the plugin needs it to be present.
*/
protected taskflow: TaskflowService | null = null;

/** Registered endpoints for this plugin */
private registeredEndpoints: PluginEndpointMap = {};

Expand Down Expand Up @@ -232,6 +243,7 @@ export abstract class Plugin<
this.name,
this.config.telemetry,
);
this.taskflow = TaskflowService.tryGetInstance();
this.isReady = true;
}

Expand All @@ -242,20 +254,37 @@ export abstract class Plugin<
telemetryConfig?: BasePluginConfig["telemetry"];
} = {},
): void {
const fromLocator = deps.services;
this.cache =
deps.services?.get<CacheManager>("cache") ??
fromLocator?.get<CacheManager>("cache") ??
this.cache ??
CacheManager.getInstanceSync();
this.telemetry = TelemetryManager.getProvider(
this.name,
deps.telemetryConfig ?? this.config.telemetry,
);
this.taskflow = fromLocator
? fromLocator.get<TaskflowService>("taskflow")
: TaskflowService.tryGetInstance();
if (deps.context !== undefined) {
this.context = deps.context as PluginContext;
}
this.isReady = true;
}

/**
* Returns the live {@link TaskflowService}, or throws if it was
* disabled via `createApp({ taskflow: false })`.
*/
protected requireTaskflow(): TaskflowService {
if (!this.taskflow) {
throw new Error(
`Plugin "${this.name}" requires TaskFlow but it was disabled via createApp({ taskflow: false }). Remove the opt-out or refactor the plugin to not use this.taskflow.`,
);
}
return this.taskflow;
}

injectRoutes(_: express.Router) {
return;
}
Expand Down
45 changes: 31 additions & 14 deletions packages/appkit/src/plugins/server/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ export class ServerPlugin extends Plugin {
protected declare config: ServerConfig;
private serverExtensions: ((app: express.Application) => void)[] = [];
private rawBodyPaths: Set<string> = new Set();
/**
* Drains the core services in reverse boot order. Wired by `_createApp`;
* optional so plugins constructed in tests keep working.
*/
private _shutdownCoreServices?: () => Promise<void>;
static phase: PluginPhase = "deferred";

constructor(config: ServerConfig) {
Expand Down Expand Up @@ -102,22 +107,22 @@ export class ServerPlugin extends Plugin {
}

/**
* Start the server.
*
* This method starts the server and sets up the frontend.
* It also sets up the remote tunneling if enabled.
*
* @returns The express application.
* Starts the server, registers the frontend, and (if enabled) the
* remote tunnel. The plugin owns the SIGTERM handler; the optional
* `shutdownCoreServices` hook supplied by `_createApp` drains core
* services on shutdown.
*/
async start(): Promise<express.Application> {
async start(options?: {
shutdownCoreServices?: () => Promise<void>;
}): Promise<express.Application> {
this._shutdownCoreServices = options?.shutdownCoreServices;
this.serverApplication.use(requestMetricsMiddleware);
this.serverApplication.use(
express.json({
type: (req) => {
// Skip JSON parsing for routes that declared skipBodyParsing
// (e.g. file uploads where the raw body must flow through).
// rawBodyPaths is populated by extendRoutes() below; the type
// callback runs per-request so the set is already filled.
// Skip JSON parsing for routes that opted out (e.g. file
// uploads). `rawBodyPaths` is populated by `extendRoutes()`
// before any request hits this callback.
const urlPath = req.url?.split("?")[0];
if (urlPath && this.rawBodyPaths.has(urlPath)) return false;
const ct = req.headers["content-type"] ?? "";
Expand Down Expand Up @@ -391,6 +396,13 @@ export class ServerPlugin extends Plugin {
}
}

/**
* Graceful shutdown sequence:
* 1. Tear down server-local pieces (Vite, tunnel, telemetry reporter).
* 2. Abort in-flight plugin operations so streams release HTTP conns.
* 3. Drain core services via `_shutdownCoreServices` (reverse boot order).
* 4. Close the HTTP server, force-exit on timeout.
*/
private async _gracefulShutdown() {
logger.info("Starting graceful shutdown...");

Expand All @@ -404,7 +416,6 @@ export class ServerPlugin extends Plugin {

TelemetryReporter.getInstance()?.stop();

// 1. abort active operations from plugins
const shutdownPlugins = this.context?.getPlugins();
if (shutdownPlugins) {
for (const plugin of shutdownPlugins.values()) {
Expand All @@ -422,14 +433,20 @@ export class ServerPlugin extends Plugin {
}
}

// 2. close the server
if (this._shutdownCoreServices) {
try {
await this._shutdownCoreServices();
} catch (err) {
logger.error("Error shutting down core services: %O", err);
}
}

if (this.server) {
this.server.close(() => {
logger.debug("Server closed gracefully");
process.exit(0);
});

// 3. timeout to force shutdown after 15 seconds
setTimeout(() => {
logger.debug("Force shutdown after timeout");
process.exit(1);
Expand Down
89 changes: 88 additions & 1 deletion packages/appkit/src/plugins/server/tests/server.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -717,7 +717,6 @@ describe("ServerPlugin", () => {
}),
} as any);

// pretend started
(plugin as any).server = mockHttpServer;

await (plugin as any)._gracefulShutdown();
Expand All @@ -730,5 +729,93 @@ describe("ServerPlugin", () => {
exitSpy.mockRestore();
vi.useRealTimers();
});

test("invokes shutdownCoreServices after aborting and before closing the server", async () => {
vi.useFakeTimers();
mockLoggerError.mockClear();
const exitSpy = vi
.spyOn(process, "exit")
.mockImplementation(((_code?: number) => undefined) as any);

const callOrder: string[] = [];
const abortFn = vi.fn(() => {
callOrder.push("abort");
});
const shutdownCoreServices = vi.fn(async () => {
callOrder.push("shutdownCoreServices");
});
const closeSpy = vi.fn((cb: any) => {
callOrder.push("server.close");
cb?.();
});

const plugin = new ServerPlugin({
context: createContextWithPlugins({
plugin: { name: "plugin", abortActiveOperations: abortFn },
}),
} as any);

(plugin as any).server = { ...mockHttpServer, close: closeSpy };
(plugin as any)._shutdownCoreServices = shutdownCoreServices;

await (plugin as any)._gracefulShutdown();
vi.runAllTimers();

expect(abortFn).toHaveBeenCalled();
expect(shutdownCoreServices).toHaveBeenCalledTimes(1);
expect(closeSpy).toHaveBeenCalled();
expect(callOrder).toEqual([
"abort",
"shutdownCoreServices",
"server.close",
]);
expect(mockLoggerError).not.toHaveBeenCalled();

exitSpy.mockRestore();
vi.useRealTimers();
});

test("isolates errors from shutdownCoreServices and still closes the server", async () => {
vi.useFakeTimers();
mockLoggerError.mockClear();
const exitSpy = vi
.spyOn(process, "exit")
.mockImplementation(((_code?: number) => undefined) as any);

const shutdownCoreServices = vi.fn(async () => {
throw new Error("registry boom");
});

const plugin = new ServerPlugin({
context: createContextWithPlugins({
plugin: { name: "plugin", abortActiveOperations: vi.fn() },
}),
} as any);

(plugin as any).server = mockHttpServer;
(plugin as any)._shutdownCoreServices = shutdownCoreServices;

await (plugin as any)._gracefulShutdown();
vi.runAllTimers();

expect(shutdownCoreServices).toHaveBeenCalled();
expect(mockLoggerError).toHaveBeenCalled();
expect(mockHttpServer.close).toHaveBeenCalled();
expect(exitSpy).toHaveBeenCalled();

exitSpy.mockRestore();
vi.useRealTimers();
});

test("start() wires shutdownCoreServices into the instance", async () => {
const shutdownCoreServices = vi.fn(async () => {});
const plugin = new ServerPlugin({
context: createContextWithPlugins({}),
} as any);

await plugin.start({ shutdownCoreServices });

expect((plugin as any)._shutdownCoreServices).toBe(shutdownCoreServices);
});
});
});
Loading