Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions packages/appkit/src/cache/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,22 @@ export class CacheManager {
return CacheManager.initPromise;
}

/** @internal */
static async boot(
config?: CacheConfig,
): Promise<{ instance: CacheManager; stop(): Promise<void> }> {
const mgr = await CacheManager.getInstance(config);
return { instance: mgr, stop: () => mgr.shutdown() };
}

/** @internal */
async shutdown(): Promise<void> {
await this.close();
this.inFlightRequests.clear();
CacheManager.instance = null;
CacheManager.initPromise = null;
}

/**
* Create a new cache manager instance
*
Expand Down
96 changes: 49 additions & 47 deletions packages/appkit/src/core/appkit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,29 +9,27 @@ import type {
PluginMap,
} from "shared";
import { version as productVersion } from "../../package.json";
import { CacheManager } from "../cache";
import { ServiceContext } from "../context";
import {
isInternalTelemetryEnabled,
TelemetryReporter,
} from "../internal-telemetry";
import { createLogger } from "../logging/logger";
import { ResourceRegistry, ResourceType } from "../registry";
import type { TelemetryConfig } from "../telemetry";
import { TelemetryManager } from "../telemetry";
import { isToolProvider, PluginContext } from "./plugin-context";

const logger = createLogger("appkit");
import { type ServiceManager, startCoreServices } from "./service-manager";

export class AppKit<TPlugins extends InputPluginMap> {
#pluginInstances: Record<string, BasePlugin> = {};
#setupPromises: Promise<void>[] = [];
#context: PluginContext;
#services: ServiceManager;

private constructor(config: { plugins: TPlugins }) {
private constructor(config: { plugins: TPlugins }, services: ServiceManager) {
const { plugins, ...globalConfig } = config;

this.#context = new PluginContext();
this.#services = services;

const pluginEntries = Object.entries(plugins);

Expand Down Expand Up @@ -89,6 +87,7 @@ export class AppKit<TPlugins extends InputPluginMap> {
if (typeof pluginInstance.attachContext === "function") {
pluginInstance.attachContext({
context: this.#context,
services: this.#services,
telemetryConfig: baseConfig.telemetry,
});
}
Expand Down Expand Up @@ -199,56 +198,59 @@ export class AppKit<TPlugins extends InputPluginMap> {
disableInternalTelemetry?: boolean;
} = {},
): Promise<PluginMap<T>> {
// Initialize core services
TelemetryManager.initialize(config?.telemetry);
await CacheManager.getInstance(config?.cache);

const rawPlugins = config.plugins as T;

// Collect manifest resources via registry
const registry = new ResourceRegistry();
registry.collectResources(rawPlugins);

// Derive ServiceContext needs from what manifests declared
const needsWarehouse = registry
.getRequired()
.some((r) => r.type === ResourceType.SQL_WAREHOUSE);
await ServiceContext.initialize(
{ warehouseId: needsWarehouse },
config?.client,
);
const services = await startCoreServices({
telemetry: config?.telemetry,
cache: config?.cache,
});

// Validate env vars
registry.enforceValidation();
try {
const rawPlugins = config.plugins as T;

const preparedPlugins = AppKit.preparePlugins(rawPlugins);
const mergedConfig = {
plugins: preparedPlugins,
};
const resourceRegistry = new ResourceRegistry();
resourceRegistry.collectResources(rawPlugins);

const instance = new AppKit(mergedConfig);
const needsWarehouse = resourceRegistry
.getRequired()
.some((r) => r.type === ResourceType.SQL_WAREHOUSE);
await ServiceContext.initialize(
{ warehouseId: needsWarehouse },
config?.client,
);

await Promise.all(instance.#setupPromises);
await instance.#context.emitLifecycle("setup:complete");
resourceRegistry.enforceValidation();

const handle = instance as unknown as PluginMap<T>;
const preparedPlugins = AppKit.preparePlugins(rawPlugins);
const mergedConfig = {
plugins: preparedPlugins,
};

if (config.onPluginsReady) {
logger.debug("Running onPluginsReady hook");
await config.onPluginsReady(handle);
logger.debug("onPluginsReady hook completed");
}
const instance = new AppKit(mergedConfig, services);

if (isInternalTelemetryEnabled(config)) {
AppKit.bootstrapInternalTelemetry();
}
await Promise.all(instance.#setupPromises);
await instance.#context.emitLifecycle("setup:complete");

const serverPlugin = instance.#pluginInstances.server;
if (serverPlugin && typeof (serverPlugin as any).start === "function") {
await (serverPlugin as any).start();
}
const handle = instance as unknown as PluginMap<T>;

return handle;
if (config.onPluginsReady) {
await config.onPluginsReady(handle);
}

if (isInternalTelemetryEnabled(config)) {
AppKit.bootstrapInternalTelemetry();
}

const serverPlugin = instance.#pluginInstances.server;
if (serverPlugin && typeof (serverPlugin as any).start === "function") {
await (serverPlugin as any).start({
shutdownCoreServices: () => services.stop(),
});
}

return handle;
} catch (error) {
await services.stop();
throw error;
}
}

private static bootstrapInternalTelemetry(): void {
Expand Down
68 changes: 68 additions & 0 deletions packages/appkit/src/core/service-manager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import type { CacheConfig } from "shared";
import { CacheManager } from "../cache";
import { createLogger } from "../logging";
import { type TelemetryConfig, TelemetryManager } from "../telemetry";

const logger = createLogger("services");

interface ServiceEntry {
readonly name: string;
readonly instance: unknown;
stop(): Promise<void>;
}

/** Holds booted core services and resolves them by name. */
export class ServiceManager {
#services: ServiceEntry[] = [];

/** Adds a service. `null` is ignored so callers can pass an opt-out result. */
add(
name: string,
service: { instance: unknown; stop(): Promise<void> } | null,
): void {
if (!service) return;
this.#services.push({ name, ...service });
logger.debug("Started: %s", name);
}

get<T>(name: string): T | null {
for (const s of this.#services) {
if (s.name === name) return s.instance as T;
}
return null;
}

/** Stops services in reverse start order. Per-service failures are logged. */
async stop(): Promise<void> {
while (this.#services.length > 0) {
const s = this.#services.pop();
if (!s) continue;
try {
await s.stop();
logger.debug("Stopped: %s", s.name);
} catch (error) {
logger.error("Stop failed for %s: %O", s.name, error);
}
}
}
}

/**
* Boots the core services AppKit ships with. Adding a new service touches
* only this function and the service module itself — the rest of the core
* stays free of concrete service imports.
*/
export async function startCoreServices(config: {
telemetry?: TelemetryConfig;
cache?: CacheConfig;
}): Promise<ServiceManager> {
const services = new ServiceManager();
try {
services.add("telemetry", await TelemetryManager.boot(config.telemetry));
services.add("cache", await CacheManager.boot(config.cache));
return services;
} catch (error) {
await services.stop();
throw error;
}
}
74 changes: 74 additions & 0 deletions packages/appkit/src/core/tests/service-manager.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import { describe, expect, test, vi } from "vitest";
import { ServiceManager } from "../service-manager";

function mockService(name: string, calls: string[]) {
return {
instance: { name },
stop: vi.fn(async () => {
calls.push(`stop:${name}`);
}),
};
}

describe("ServiceManager", () => {
test("get<T> returns the registered instance", () => {
const sm = new ServiceManager();
const svc = { instance: { hello: "world" }, stop: vi.fn() };
sm.add("greeter", svc);

expect(sm.get<{ hello: string }>("greeter")).toEqual({ hello: "world" });
});

test("get<T> returns null for unknown service", () => {
const sm = new ServiceManager();
expect(sm.get("missing")).toBeNull();
});

test("add(null) is a no-op (opt-out pattern)", async () => {
const sm = new ServiceManager();
sm.add("absent", null);

expect(sm.get("absent")).toBeNull();
await expect(sm.stop()).resolves.toBeUndefined();
});

test("stop() invokes services in reverse add order", async () => {
const calls: string[] = [];
const sm = new ServiceManager();
sm.add("a", mockService("a", calls));
sm.add("b", mockService("b", calls));
sm.add("c", mockService("c", calls));

await sm.stop();

expect(calls).toEqual(["stop:c", "stop:b", "stop:a"]);
});

test("stop() continues when one service throws", async () => {
const calls: string[] = [];
const sm = new ServiceManager();
sm.add("a", mockService("a", calls));
sm.add("b", {
instance: {},
stop: vi.fn(async () => {
throw new Error("boom");
}),
});
sm.add("c", mockService("c", calls));

await expect(sm.stop()).resolves.toBeUndefined();
expect(calls).toEqual(["stop:c", "stop:a"]);
});

test("stop() drains the registry (idempotent on second call)", async () => {
const calls: string[] = [];
const sm = new ServiceManager();
sm.add("a", mockService("a", calls));

await sm.stop();
await sm.stop();

expect(calls).toEqual(["stop:a"]);
expect(sm.get("a")).toBeNull();
});
});
30 changes: 11 additions & 19 deletions packages/appkit/src/plugin/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -218,14 +218,10 @@ export abstract class Plugin<
| PluginContext
| undefined;

// Eagerly bind telemetry + cache if the core services have already been
// initialized (normal createApp path, or tests that mock CacheManager).
// If they haven't, we leave these undefined and rely on `attachContext`
// being called later — this lets factories eagerly construct plugin
// instances at module top-level before `createApp` has run.
this.tryAttachContext();
}

// Fallback for plugins instantiated outside `_createApp` (test path).
private tryAttachContext(): void {
try {
this.cache = CacheManager.getInstanceSync();
Expand All @@ -239,23 +235,17 @@ export abstract class Plugin<
this.isReady = true;
}

/**
* Binds runtime dependencies (telemetry provider, cache, plugin context) to
* this plugin. Called by `AppKit._createApp` after construction and before
* `setup()`. Idempotent: safe to call if the constructor already bound them
* eagerly. Kept separate so factories can eagerly construct plugin instances
* without running this before `TelemetryManager.initialize()` /
* `CacheManager.getInstance()` have run.
*/
attachContext(
deps: {
context?: unknown;
services?: { get<T>(name: string): T | null };
telemetryConfig?: BasePluginConfig["telemetry"];
} = {},
): void {
if (!this.cache) {
this.cache = CacheManager.getInstanceSync();
}
this.cache =
deps.services?.get<CacheManager>("cache") ??
this.cache ??
CacheManager.getInstanceSync();
this.telemetry = TelemetryManager.getProvider(
this.name,
deps.telemetryConfig ?? this.config.telemetry,
Expand Down Expand Up @@ -698,10 +688,12 @@ export abstract class Plugin<
}

private _checkIfGenerator(
result: any,
): result is AsyncGenerator<any, void, unknown> {
result: unknown,
): result is AsyncGenerator<unknown, void, unknown> {
return (
result && typeof result === "object" && Symbol.asyncIterator in result
typeof result === "object" &&
result !== null &&
Symbol.asyncIterator in result
);
}
}
Loading
Loading