From 574a16f7c95227551d2b2a49550e1c595cd1f086 Mon Sep 17 00:00:00 2001 From: ditadi Date: Wed, 13 May 2026 13:01:08 +0100 Subject: [PATCH] feat(appkit): migrate analytics plugin to TaskFlow MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Analytics is the first production consumer of the TaskFlow surface added in PRs 3 + 4. The plugin registers an `analytics:query` durable task, routes OBO vs SP through `runInUserContext`, persists `statement_submitted` events so recovery can re-attach via `pollStatement` (the connector split from PR 2), and emits typed `data` / `recovered` / `statement_submitted` events that PR 6's frontend SSE client will consume. End-to-end proof of the integration: the OBO guard, the connector split, the typed events, and the recovery semantics all meet here. `analytics/types.ts`: - `AnalyticsQueryTaskInput` — `{ query_key, params?, isAsUser? }`. The `isAsUser` discriminator is part of the input (not derived from the active scope inside the handler) so recovery re-runs route to the correct OBO / SP branch even when the original `runInUserContext` frame is gone. - `AnalyticsQueryTaskResult` — `{ data, columns, statementId?, fromCache? }`. - `AnalyticsTaskEvents` — `{ statement_submitted, data, recovered }`. The typed event map is what `executeTask`'s SSE bridge stamps onto `event: ` / `data: `. `analytics/analytics.ts` — `~538 LOC net add` covering: - `setup()` registers the `analytics:query` task with `autoRecover: false` (OBO recovery cannot be safely auto-retried — the user's token has a finite lifetime). - `_runQueryTask(input, ctx)` — task body. Re-derives the OBO branch from `userContextFromTaskCtx(ctx)` (sidecar carries the `UserContext` across the FFI), throws with the F6 actionable message when `input.isAsUser && !userCtx` (recovery without context: req is a hard error, not a silent fall-through to SP). - `_runQueryInner(input, ctx)` — recovery-aware execution. Scans `ctx.previousEvents` for a `statement_submitted` checkpoint; if found, skips `submitStatement` and re-attaches via `pollStatement(statementId)` so the warehouse statement isn't re-issued on crash recovery. Emits `recovered` before doing so for client observability. - `_emitDataFrame` — wraps `ctx.emit("data", ...)` with the typed payload shape so wire-shape changes are caught at compile time. - `_handleQueryRoute` — HTTP entrypoint goes through `executeTask` when TaskFlow is enabled; falls back to the legacy `_queryDirect` path when `createApp({ taskflow: false })`. Same wire-shape on both paths so clients don't branch. - `_queryDirect` — preserved verbatim for the opt-out path. Eventually retire once internal consumers commit to TaskFlow. - Idempotency-key derivation (`executorKey`) is shared between the programmatic `query()` and `_handleQueryRoute` so the same logical query produces the same IK regardless of entrypoint. Identity comes only from `getCurrentUserContext()` — never from request headers or settings. Tests: - `analytics.test.ts` — refactored for the durable wire. Uses `createStubTaskflowService` from `tools/test-helpers.ts` (added in PR 4) so each test exercises the registration + submission shape without booting the real engine. `~307 LOC delta`. - `analytics.integration.test.ts` — wire-shape assertions. Verifies the SSE response carries `event: data` with the expected payload, `event: statement_submitted` checkpoint, and terminal `event: completed` framing. `~47 LOC delta`. - `analytics.recovery.test.ts` (new, 142 LOC) — exercises the `_runQueryTask` recovery branch with a pre-populated `ctx.previousEvents` carrying `statement_submitted`, and asserts the OBO-without-context throw produces the F6 actionable error message rather than silently submitting as SP. - `analytics/defaults.ts` removed — the legacy `queryDefaults` (`PluginExecuteConfig` with retry/cache/timeout knobs) is no longer consumed: TaskFlow handles those concerns natively. `context/index.ts`: - Re-exports `isInUserContext` from the barrel so analytics can import it via `from "../../context"` alongside `getCurrentUserContext`. (PR 4 had this temporarily off the barrel because no consumer existed yet; PR 5 is that consumer.) Verify: - `pnpm -r typecheck`, `pnpm build`, `pnpm test` (123 files, 2281 tests) all green. - `pnpm exec knip` clean — defaults.ts removal closes the orphaned-file warning. - `pnpm exec biome check` clean on touched files. Risk. Performance cliff for sub-500ms cached queries: TaskFlow adds WAL-write + dedup overhead per invocation, bounded but real. If the regression is unacceptable in benchmarks, add a `direct: true` opt-out as a follow-up — the legacy `_queryDirect` path is already retained for `createApp({ taskflow: false })` and can be exposed per-call without touching the recovery / OBO surface. Not in this PR. No frontend changes (no React, no `subscribeToTaskflowTask` — that's PR 6). No demo plugin. No doc rewrite. Stacked on: stack/taskflow/execute-task. Signed-off-by: ditadi --- packages/appkit/src/context/index.ts | 1 + .../appkit/src/plugins/analytics/analytics.ts | 421 +++++++++++++++--- .../appkit/src/plugins/analytics/defaults.ts | 14 - .../tests/analytics.integration.test.ts | 30 +- .../tests/analytics.recovery.test.ts | 142 ++++++ .../plugins/analytics/tests/analytics.test.ts | 280 +++++++----- .../appkit/src/plugins/analytics/types.ts | 7 + packages/appkit/src/tasks/index.ts | 1 + packages/appkit/src/tasks/user-context.ts | 35 ++ 9 files changed, 723 insertions(+), 208 deletions(-) delete mode 100644 packages/appkit/src/plugins/analytics/defaults.ts create mode 100644 packages/appkit/src/plugins/analytics/tests/analytics.recovery.test.ts create mode 100644 packages/appkit/src/tasks/user-context.ts diff --git a/packages/appkit/src/context/index.ts b/packages/appkit/src/context/index.ts index 7470ca367..1e833da6b 100644 --- a/packages/appkit/src/context/index.ts +++ b/packages/appkit/src/context/index.ts @@ -5,6 +5,7 @@ export { getWarehouseId, getWorkspaceClient, getWorkspaceId, + isInUserContext, runInUserContext, } from "./execution-context"; export { ServiceContext } from "./service-context"; diff --git a/packages/appkit/src/plugins/analytics/analytics.ts b/packages/appkit/src/plugins/analytics/analytics.ts index a16d6303a..af4d1febd 100644 --- a/packages/appkit/src/plugins/analytics/analytics.ts +++ b/packages/appkit/src/plugins/analytics/analytics.ts @@ -1,16 +1,21 @@ -import type { WorkspaceClient } from "@databricks/sdk-experimental"; +import type { sql, WorkspaceClient } from "@databricks/sdk-experimental"; import type express from "express"; import type { AgentToolDefinition, IAppRouter, - PluginExecuteConfig, SQLTypeMarker, - StreamExecutionSettings, ToolProvider, } from "shared"; import { z } from "zod"; import { SQLWarehouseConnector } from "../../connectors"; -import { getWarehouseId, getWorkspaceClient } from "../../context"; +import { + getCurrentUserContext, + getCurrentUserId, + getWarehouseId, + getWorkspaceClient, + isInUserContext, + runInUserContext, +} from "../../context"; import { buildToolkitEntries } from "../../core/agent/build-toolkit"; import { defineTool, @@ -18,10 +23,11 @@ import { toolsFromRegistry, } from "../../core/agent/tools/define-tool"; import { assertReadOnlySql } from "../../core/agent/tools/sql-policy"; +import { ExecutionError } from "../../errors"; import { createLogger } from "../../logging/logger"; import { Plugin, toPlugin } from "../../plugin"; import type { PluginManifest } from "../../registry"; -import { queryDefaults } from "./defaults"; +import { type TypedTaskContext, userContextFromTaskCtx } from "../../tasks"; import manifest from "./manifest.json"; import { QueryProcessor } from "./query"; import type { @@ -32,6 +38,39 @@ import type { const logger = createLogger("analytics"); +/** + * Input for the durable `analytics:query` task. Every field participates + * in the engine-derived IK, so the SP/OBO discriminator (`executorKey`, + * `isAsUser`) and `formatType` must live here to keep dedup correct. + */ +interface AnalyticsQueryTaskInput { + queryKey: string; + statement: string; + parameters?: Record; + formatParameters?: Record; + executorKey: string; + isAsUser: boolean; + formatType: "arrow" | "result"; +} + +/** Flat shape mirroring the legacy `response.result`: `{ statement_id, status }` for Arrow, `{ ...rest, data }` for JSON, or `null`. */ +type AnalyticsQueryTaskResult = Record | null; + +/** + * Typed `ctx.emit` map. Each key becomes the SSE `event:` name on the wire. + * - `data`: terminal frame the client renders. + * - `statement_submitted`: WAL checkpoint so recovery can re-attach. + * - `recovered`: signals revival, with or without re-attach. + */ +interface AnalyticsTaskEvents extends Record { + data: { type: "arrow" | "result"; [k: string]: unknown }; + statement_submitted: { + statement_id: string; + status?: sql.StatementStatus["state"]; + }; + recovered: { reattach: true; statement_id: string } | { reattach: false }; +} + export class AnalyticsPlugin extends Plugin implements ToolProvider { /** Plugin manifest declaring metadata and resource requirements */ static manifest = manifest as PluginManifest<"analytics">; @@ -43,6 +82,11 @@ export class AnalyticsPlugin extends Plugin implements ToolProvider { private SQLClient: SQLWarehouseConnector; private queryProcessor: QueryProcessor; + /** Plugin-scoped task name so multi-instance setups don't collide. */ + private get queryTaskName(): string { + return `${this.name}:query`; + } + constructor(config: IAnalyticsConfig) { super(config); this.config = config; @@ -54,6 +98,152 @@ export class AnalyticsPlugin extends Plugin implements ToolProvider { }); } + /** + * Register the durable `analytics::query` task. No-op when + * TaskFlow is opted out; {@link query} falls back to the direct path. + * + * `autoRecover: false` because OBO recovery needs a fresh request: + * callers revive via `this.task.resume(ik, { context: req })`. + */ + async setup(): Promise { + if (!this.task) { + logger.debug("TaskFlow disabled; analytics will use the direct path."); + return; + } + this.task.task< + AnalyticsQueryTaskInput, + AnalyticsQueryTaskResult, + AnalyticsTaskEvents + >({ + name: this.queryTaskName, + execute: (input, ctx) => this._runQueryTask(input, ctx), + autoRecover: false, + }); + } + + /** + * Resolves SP vs OBO from `ctx.context`, then delegates to + * {@link _runQueryInner}. OBO without a forwarded UserContext is a + * hard error — silently falling back to SP would leak results across + * users. + */ + private async _runQueryTask( + input: AnalyticsQueryTaskInput, + ctx: TypedTaskContext, + ): Promise { + const userCtx = input.isAsUser ? userContextFromTaskCtx(ctx) : null; + if (input.isAsUser && !userCtx) { + throw new Error( + "OBO analytics task ran without a UserContext. Pass `context: req` " + + "to `this.task.resume(...)` from a fresh authenticated request, or " + + "invoke via `appkit..asUser(req)` so the bridge captures " + + "it. Falling back to the service principal would leak results.", + ); + } + if (userCtx) { + return runInUserContext(userCtx, () => this._runQueryInner(input, ctx)); + } + return this._runQueryInner(input, ctx); + } + + private async _runQueryInner( + input: AnalyticsQueryTaskInput, + ctx: TypedTaskContext, + ): Promise { + const wsClient = getWorkspaceClient(); + const warehouseId = await getWarehouseId(); + + if (ctx.isRecovery) { + const events = Array.isArray(ctx.previousEvents) + ? ctx.previousEvents + : []; + let submitted: (typeof events)[number] | undefined; + for (let i = events.length - 1; i >= 0; i--) { + const evt = events[i]; + if (evt?.eventType === "custom:statement_submitted") { + submitted = evt; + break; + } + } + const statementId = + submitted?.payload && typeof submitted.payload === "object" + ? (submitted.payload as { statement_id?: string }).statement_id + : undefined; + if (statementId) { + await ctx.emit("recovered", { + reattach: true, + statement_id: statementId, + }); + const raw = await this.SQLClient.pollStatement(wsClient, statementId); + const flat = AnalyticsPlugin._flattenStatementResult(raw); + await this._emitDataFrame(ctx, input, flat); + return flat; + } + // Crashed before the checkpoint landed — re-execute. + await ctx.emit("recovered", { reattach: false }); + } + + const { statement, parameters: sqlParameters } = + this.queryProcessor.convertToSQLParameters( + input.statement, + input.parameters, + ); + + const submission = await this.SQLClient.submitStatement(wsClient, { + statement, + warehouse_id: warehouseId, + parameters: sqlParameters, + ...(input.formatParameters as Partial), + }); + const statementId = submission.statement_id as string; + await ctx.emit("statement_submitted", { + statement_id: statementId, + status: submission.status?.state, + }); + + const raw = + submission.status?.state === "SUCCEEDED" + ? this.SQLClient.transformResult(submission) + : await this.SQLClient.pollStatement(wsClient, statementId); + const flat = AnalyticsPlugin._flattenStatementResult(raw); + await this._emitDataFrame(ctx, input, flat); + return flat; + } + + /** + * Unwraps `.result` so SSE and programmatic callers see the same flat + * shape as the legacy direct path. Returns `null` for DDL/DML with no + * result body. + */ + private static _flattenStatementResult( + raw: sql.StatementResponse | { result: unknown }, + ): AnalyticsQueryTaskResult { + if (raw && typeof raw === "object" && "result" in raw) { + const inner = (raw as { result: unknown }).result; + if (inner && typeof inner === "object" && !Array.isArray(inner)) { + return inner as Record; + } + } + return null; + } + + /** + * Emits the terminal `data` frame in the flat shape the analytics + * client expects (`{ type, ...flat }`). The engine's own `completed` + * frame wraps the handler return — clients read this one instead. + */ + private async _emitDataFrame( + ctx: TypedTaskContext, + input: AnalyticsQueryTaskInput, + flat: AnalyticsQueryTaskResult, + ): Promise { + const body: AnalyticsTaskEvents["data"] = { + type: input.formatType, + ...(flat ?? {}), + }; + await ctx.emit("data", body); + } + injectRoutes(router: IAppRouter) { // Arrow data downloads always run as service principal and bypass the // interceptor chain (execute/executeStream). The original query execution @@ -128,7 +318,11 @@ export class AnalyticsPlugin extends Plugin implements ToolProvider { res: express.Response, ): Promise { const { query_key } = req.params; - const { parameters, format = "JSON" } = req.body as IAnalyticsQueryRequest; + const { + parameters, + format = "JSON", + direct = false, + } = req.body as IAnalyticsQueryRequest; // Request-scoped logging with WideEvent tracking logger.debug(req, "Executing query: %s (format=%s)", query_key, format); @@ -158,80 +352,101 @@ export class AnalyticsPlugin extends Plugin implements ToolProvider { } const { query, isAsUser } = queryResult; - - // get execution context - user-scoped if .obo.sql, otherwise service principal - const executor = isAsUser ? this.asUser(req) : this; const executorKey = isAsUser ? this.resolveUserId(req) : "global"; - const queryParameters = - format === "ARROW" - ? { - formatParameters: { - disposition: "EXTERNAL_LINKS", - format: "ARROW_STREAM", - }, - type: "arrow", - } - : { - type: "result", - }; - - const hashedQuery = this.queryProcessor.hashQuery(query); - - const defaultConfig: PluginExecuteConfig = { - ...queryDefaults, - cache: { - ...queryDefaults.cache, - cacheKey: [ - "analytics:query", - query_key, - JSON.stringify(parameters), - JSON.stringify(format), - hashedQuery, - executorKey, - ], - }, - }; + const isArrow = format === "ARROW"; + const formatParametersForRequest = isArrow + ? { disposition: "EXTERNAL_LINKS", format: "ARROW_STREAM" } + : undefined; + const formatType: "arrow" | "result" = isArrow ? "arrow" : "result"; - const streamExecutionSettings: StreamExecutionSettings = { - default: defaultConfig, - }; + const processedParams = await this.queryProcessor.processQueryParams( + query, + parameters, + ); - await executor.executeStream( + // OBO goes through the `asUser(req)` proxy so `executeTask` runs + // inside `runInUserContext` and the bridge forwards the user to the + // engine sidecar. + const target = isAsUser ? this.asUser(req) : this; + + // `direct: true` opts out for hot paths where the WAL + spawn + // overhead dominates a sub-500ms query. Auto-falls-through when + // TaskFlow is disabled at boot. + if (direct || !this.task) { + await this._handleDirectQueryRoute(req, res, target as this, { + query, + processedParams, + formatParametersForRequest, + formatType, + isAsUser, + }); + return; + } + + // OBO uses `at_most_once` to prevent two pods double-submitting the + // same warehouse statement (DML side effects, billing). SP stays on + // `at_least_once` for latency since results are read-only. + await target.executeTask( res, - async (signal) => { - const processedParams = await this.queryProcessor.processQueryParams( - query, - parameters, - ); - - const result = await executor.query( - query, - processedParams, - queryParameters.formatParameters, - signal, - ); - - return { type: queryParameters.type, ...result }; + this.queryTaskName, + { + queryKey: query_key, + statement: query, + parameters: processedParams, + formatParameters: formatParametersForRequest, + executorKey, + isAsUser, + formatType, }, - streamExecutionSettings, - executorKey, + { + executeMode: isAsUser ? "at_most_once" : "at_least_once", + }, + ); + } + + /** + * Bypasses TaskFlow but emits the same `{ type, ...flat }` payload as + * the durable path. No IK, no recovery, no dedup — one-shot. + */ + private async _handleDirectQueryRoute( + _req: express.Request, + res: express.Response, + target: this, + args: { + query: string; + processedParams: Record; + formatParametersForRequest: Record | undefined; + formatType: "arrow" | "result"; + isAsUser: boolean; + }, + ): Promise { + const flat = await target._queryDirect( + args.query, + args.processedParams, + args.formatParametersForRequest, ); + const body = { + type: args.formatType, + ...((flat as Record | null | undefined) ?? {}), + }; + res.setHeader("Content-Type", "application/json; charset=utf-8"); + res.status(200).json(body); } /** - * Execute a SQL query using the current execution context. + * Execute a SQL query. Defaults to the durable TaskFlow path + * (statement+params+format+executor dedup, crash-recovery via the + * persisted `statement_submitted` checkpoint). Falls back to the + * direct path when `options.direct` or when TaskFlow is opted out. * - * When called directly: uses service principal credentials. - * When called via asUser(req).query(...): uses user's credentials. + * Identity is the active execution context: SP by default, the + * caller's user when invoked via `asUser(req).query(...)`. * * @example * ```typescript - * // Service principal execution - * const result = await analytics.query("SELECT * FROM table") - * - * // User context execution (in route handler) - * const result = await this.asUser(req).query("SELECT * FROM table") + * await analytics.query("SELECT * FROM table"); // SP + * await this.asUser(req).query("SELECT * FROM table"); // OBO * ``` */ async query( @@ -239,13 +454,82 @@ export class AnalyticsPlugin extends Plugin implements ToolProvider { parameters?: Record, formatParameters?: Record, signal?: AbortSignal, + options?: { direct?: boolean }, + ): Promise { + if (options?.direct || !this.task) { + return this._queryDirect(query, parameters, formatParameters, signal); + } + + const isAsUser = isInUserContext(); + // `executorKey` shape MUST match `_handleQueryRoute`: same logical + // query through HTTP and the programmatic API has to produce the + // same IK, otherwise dedup breaks across entrypoints. + const executorKey = isAsUser ? getCurrentUserId() : "global"; + const formatType: "arrow" | "result" = + formatParameters?.disposition === "EXTERNAL_LINKS" ? "arrow" : "result"; + const input: AnalyticsQueryTaskInput = { + queryKey: "programmatic", + statement: query, + parameters, + formatParameters, + executorKey, + isAsUser, + formatType, + }; + + // OBO: forward the live UserContext via the engine sidecar so the + // handler can re-enter `runInUserContext` without re-parsing + // headers. `at_most_once` to avoid double-submit across pods. + const handle = await this.task.start(this.queryTaskName, input, { + userId: isAsUser ? executorKey : undefined, + context: getCurrentUserContext() ?? undefined, + executeMode: isAsUser ? "at_most_once" : "at_least_once", + }); + + for await (const evt of this.task.subscribe(handle.idempotencyKey)) { + if (signal?.aborted) { + // Best-effort cooperative stop; engine owns the final state. + await this.task + .stop(handle.idempotencyKey, { + reason: "client_aborted", + userId: isAsUser ? executorKey : undefined, + }) + .catch(() => {}); + throw ExecutionError.canceled(); + } + const type = evt.event.eventType; + if (type === "completed") { + const payload = evt.event.payload as { + result?: unknown; + } | null; + return payload?.result ?? payload; + } + if (type === "failed") { + const message = + (evt.event.payload as { error?: string } | null)?.error ?? + "task failed"; + throw ExecutionError.statementFailed(message); + } + if (type === "cancelled" || type === "suspended") { + throw ExecutionError.canceled(); + } + } + throw ExecutionError.statementFailed( + "Query stream closed without a terminal event", + ); + } + + /** Direct path — single point of fallback when TaskFlow is opted out or `direct: true`. */ + private async _queryDirect( + query: string, + parameters?: Record, + formatParameters?: Record, + signal?: AbortSignal, ): Promise { const workspaceClient = getWorkspaceClient(); const warehouseId = await getWarehouseId(); - const { statement, parameters: sqlParameters } = this.queryProcessor.convertToSQLParameters(query, parameters); - const response = await this.SQLClient.executeStatement( workspaceClient, { @@ -256,7 +540,6 @@ export class AnalyticsPlugin extends Plugin implements ToolProvider { }, signal, ); - return response.result; } diff --git a/packages/appkit/src/plugins/analytics/defaults.ts b/packages/appkit/src/plugins/analytics/defaults.ts deleted file mode 100644 index 1cdd79628..000000000 --- a/packages/appkit/src/plugins/analytics/defaults.ts +++ /dev/null @@ -1,14 +0,0 @@ -import type { PluginExecuteConfig } from "shared"; - -export const queryDefaults: PluginExecuteConfig = { - cache: { - enabled: true, - ttl: 3600, - }, - retry: { - enabled: true, - initialDelay: 1500, - attempts: 3, - }, - timeout: 18000, -}; diff --git a/packages/appkit/src/plugins/analytics/tests/analytics.integration.test.ts b/packages/appkit/src/plugins/analytics/tests/analytics.integration.test.ts index 5c08b8d43..7c0b5a690 100644 --- a/packages/appkit/src/plugins/analytics/tests/analytics.integration.test.ts +++ b/packages/appkit/src/plugins/analytics/tests/analytics.integration.test.ts @@ -109,8 +109,10 @@ describe("Analytics Plugin Integration", () => { "text/event-stream; charset=utf-8", ); - const sseData = await parseSSEResponse(response); - expect(sseData.eventType).toBe("result"); + const sseData = await parseSSEResponse(response, { eventType: "data" }); + expect(sseData.eventType).toBe("data"); + expect(sseData.type).toBe("result"); + // Flat shape (rows at `data`, not `result.data`) — matches the legacy direct path. expect(sseData.data).toEqual([ { name: "Alice", age: "30" }, { name: "Bob", age: "25" }, @@ -152,6 +154,9 @@ describe("Analytics Plugin Integration", () => { ); expect(response.status).toBe(200); + // Drain the SSE stream first — the bridge returns 200 as soon as + // headers go out, so the handler hasn't run yet. + await parseSSEResponse(response, { eventType: "data" }); const callArgs = mockClient.mocks.executeStatement.mock.calls[0][0]; expect(callArgs.parameters).toEqual( @@ -194,9 +199,14 @@ describe("Analytics Plugin Integration", () => { isAsUser: false, }); + // Both `submit` and `poll` must surface FAILED — the handler + // can take either path depending on the warehouse's response timing. mockClient.mocks.executeStatement.mockResolvedValue( createFailedSQLResponse("Table not found"), ); + mockClient.mocks.getStatement.mockResolvedValue( + createFailedSQLResponse("Table not found"), + ); const response = await fetch(`${baseUrl}/api/analytics/query/broken`, { method: "POST", @@ -206,7 +216,7 @@ describe("Analytics Plugin Integration", () => { expect(response.status).toBe(200); const text = await response.text(); - expect(text).toContain("event: error"); + expect(text).toContain("event: failed"); }); test("should handle SDK exceptions", async () => { @@ -227,12 +237,12 @@ describe("Analytics Plugin Integration", () => { expect(response.status).toBe(200); const text = await response.text(); - expect(text).toContain("event: error"); + expect(text).toContain("event: failed"); }); }); - describe("Caching", () => { - test("should cache results for identical requests", async () => { + describe("Idempotency", () => { + test("identical requests return identical data", async () => { const testQuery = "SELECT * FROM cached"; getAppQuerySpy.mockResolvedValue({ @@ -252,7 +262,7 @@ describe("Analytics Plugin Integration", () => { body: JSON.stringify({ parameters: {} }), }, ); - const data1 = await parseSSEResponse(response1); + const data1 = await parseSSEResponse(response1, { eventType: "data" }); const response2 = await fetch( `${baseUrl}/api/analytics/query/cache_test`, @@ -262,11 +272,13 @@ describe("Analytics Plugin Integration", () => { body: JSON.stringify({ parameters: {} }), }, ); - const data2 = await parseSSEResponse(response2); + const data2 = await parseSSEResponse(response2, { eventType: "data" }); + // Identical IK → identical wire payload. We don't assert the + // submit count: `at_least_once` dedupes in-flight only, so + // sequential requests after a terminal state may re-execute. expect(data1.data).toEqual([{ value: "cached_value" }]); expect(data2.data).toEqual([{ value: "cached_value" }]); - expect(mockClient.mocks.executeStatement).toHaveBeenCalledTimes(1); }); }); }); diff --git a/packages/appkit/src/plugins/analytics/tests/analytics.recovery.test.ts b/packages/appkit/src/plugins/analytics/tests/analytics.recovery.test.ts new file mode 100644 index 000000000..da37ba450 --- /dev/null +++ b/packages/appkit/src/plugins/analytics/tests/analytics.recovery.test.ts @@ -0,0 +1,142 @@ +import { + createMockUserContext, + createStubTaskManager, + mockServiceContext, + setupDatabricksEnv, +} from "@tools/test-helpers"; +import { afterEach, beforeEach, describe, expect, test, vi } from "vitest"; +import * as executionContext from "../../../context/execution-context"; +import { ServiceContext } from "../../../context/service-context"; +import { TaskManager } from "../../../tasks"; +import { AnalyticsPlugin } from "../analytics"; +import type { IAnalyticsConfig } from "../types"; + +const { mockCacheInstance } = vi.hoisted(() => ({ + mockCacheInstance: { + get: vi.fn(), + set: vi.fn(), + delete: vi.fn(), + getOrExecute: vi.fn(async (_k: unknown[], fn: () => Promise) => + fn(), + ), + generateKey: vi.fn(() => "test-key"), + }, +})); + +vi.mock("../../../cache", () => ({ + CacheManager: { + getInstanceSync: vi.fn(() => mockCacheInstance), + }, +})); + +describe("AnalyticsPlugin durable query task — OBO recovery contract", () => { + let config: IAnalyticsConfig; + let serviceContextMock: Awaited>; + let taskStub: ReturnType; + let getInstanceSyncSpy: ReturnType; + let plugin: AnalyticsPlugin; + + const baseInput = { + queryKey: "test-q", + statement: "SELECT 1", + executorKey: "user:obo", + isAsUser: true, + formatType: "result" as const, + }; + + beforeEach(async () => { + config = { timeout: 5000 }; + setupDatabricksEnv(); + ServiceContext.reset(); + serviceContextMock = await mockServiceContext(); + taskStub = createStubTaskManager(); + getInstanceSyncSpy = vi + .spyOn(TaskManager, "getInstanceSync") + .mockReturnValue(taskStub as unknown as TaskManager); + + plugin = new AnalyticsPlugin(config); + await plugin.setup(); + }); + + afterEach(() => { + serviceContextMock?.restore(); + getInstanceSyncSpy?.mockRestore(); + }); + + test("throws when OBO task runs with ctx.context null on recovery", async () => { + const stubCtx = { + context: null, + isRecovery: true, + previousEvents: [] as unknown[], + emit: vi.fn(), + heartbeat: vi.fn(), + }; + + const runQueryTask = ( + plugin as unknown as { + _runQueryTask: ( + input: typeof baseInput, + ctx: typeof stubCtx, + ) => Promise; + } + )._runQueryTask.bind(plugin); + + const outcome = await runQueryTask(baseInput, stubCtx).then( + () => ({ threw: false as const, message: "" }), + (e: unknown) => ({ + threw: true as const, + message: e instanceof Error ? e.message : String(e), + }), + ); + + expect(outcome.threw).toBe(true); + expect(outcome.message).toMatch(/OBO/i); + expect(outcome.message).toMatch(/service[-\s]principal/i); + expect(outcome.message).toMatch(/context:\s*req/i); + }); + + test("delegates OBO execution through runInUserContext to _runQueryInner", async () => { + const mockUserContext = createMockUserContext(); + const stubCtx = { + context: mockUserContext, + isRecovery: true, + previousEvents: [] as unknown[], + emit: vi.fn(), + heartbeat: vi.fn(), + }; + + const fakeResult = { ok: true, source: "_runQueryInner" }; + const runInSpy = vi.spyOn(executionContext, "runInUserContext"); + const innerSpy = vi + .spyOn( + plugin as unknown as { _runQueryInner: () => Promise }, + "_runQueryInner", + ) + .mockResolvedValue(fakeResult); + + try { + const runQueryTask = ( + plugin as unknown as { + _runQueryTask: ( + input: typeof baseInput, + ctx: typeof stubCtx, + ) => Promise; + } + )._runQueryTask.bind(plugin); + + const result = await runQueryTask(baseInput, stubCtx); + + expect(result).toEqual(fakeResult); + expect(runInSpy).toHaveBeenCalledOnce(); + expect(runInSpy).toHaveBeenCalledWith( + mockUserContext, + expect.any(Function), + ); + expect(innerSpy).toHaveBeenCalledOnce(); + expect(innerSpy).toHaveBeenCalledWith(baseInput, stubCtx); + } finally { + runInSpy.mockRestore(); + innerSpy.mockRestore(); + } + }); +}); diff --git a/packages/appkit/src/plugins/analytics/tests/analytics.test.ts b/packages/appkit/src/plugins/analytics/tests/analytics.test.ts index eb06ea952..0cbd9513c 100644 --- a/packages/appkit/src/plugins/analytics/tests/analytics.test.ts +++ b/packages/appkit/src/plugins/analytics/tests/analytics.test.ts @@ -2,12 +2,14 @@ import { createMockRequest, createMockResponse, createMockRouter, + createStubTaskManager, mockServiceContext, setupDatabricksEnv, } from "@tools/test-helpers"; import { sql } from "shared"; import { afterEach, beforeEach, describe, expect, test, vi } from "vitest"; import { ServiceContext } from "../../../context/service-context"; +import { TaskManager } from "../../../tasks"; import { AnalyticsPlugin, analytics } from "../analytics"; import type { IAnalyticsConfig } from "../types"; @@ -54,6 +56,8 @@ vi.mock("../../../cache", () => ({ describe("Analytics Plugin", () => { let config: IAnalyticsConfig; let serviceContextMock: Awaited>; + let taskStub: ReturnType; + let getInstanceSyncSpy: ReturnType; beforeEach(async () => { config = { timeout: 5000 }; @@ -61,12 +65,47 @@ describe("Analytics Plugin", () => { mockCacheStore.clear(); ServiceContext.reset(); serviceContextMock = await mockServiceContext(); + + // The Plugin base eager-binds `this.task` from the singleton. + // Stub runs the registered handler in-process — no WAL or FFI. + taskStub = createStubTaskManager(); + getInstanceSyncSpy = vi + .spyOn(TaskManager, "getInstanceSync") + .mockReturnValue(taskStub as unknown as TaskManager); }); afterEach(() => { serviceContextMock?.restore(); + getInstanceSyncSpy?.mockRestore(); }); + /** Instantiates and registers the `analytics:query` task on the stub. */ + async function makeReadyPlugin(cfg: IAnalyticsConfig = config) { + const plugin = new AnalyticsPlugin(cfg); + await plugin.setup(); + return plugin; + } + + /** Builds the SUCCEEDED `submitStatement` response from `[rows, columns]`. */ + function makeSucceededSubmission( + data: unknown[][], + columns: Array<{ name: string; type_name?: string }>, + ) { + return { + status: { state: "SUCCEEDED" as const }, + statement_id: `stmt-${Math.random().toString(36).slice(2, 10)}`, + result: { data_array: data }, + manifest: { + schema: { + columns: columns.map((c) => ({ + name: c.name, + type_name: c.type_name ?? "STRING", + })), + }, + }, + }; + } + test("Analytics plugin data should have correct name", () => { const pluginData = analytics({} as any); expect(pluginData.name).toBe("analytics"); @@ -128,25 +167,27 @@ describe("Analytics Plugin", () => { }); test("/query/:query_key should execute as service principal for .sql files (isAsUser: false)", async () => { - const plugin = new AnalyticsPlugin(config); + const plugin = await makeReadyPlugin(); const { router, getHandler } = createMockRouter(); - // Mock getAppQuery to return a regular .sql file (isAsUser: false) (plugin as any).app.getAppQuery = vi.fn().mockResolvedValue({ query: "SELECT * FROM test", isAsUser: false, }); let capturedWorkspaceClient: any; - const executeMock = vi + const submitMock = vi .fn() .mockImplementation((workspaceClient, ..._args) => { capturedWorkspaceClient = workspaceClient; - return Promise.resolve({ - result: { data: [{ id: 1, name: "test" }] }, - }); + return Promise.resolve( + makeSucceededSubmission( + [[1, "test"]], + [{ name: "id" }, { name: "name" }], + ), + ); }); - (plugin as any).SQLClient.executeStatement = executeMock; + (plugin as any).SQLClient.submitStatement = submitMock; plugin.injectRoutes(router); @@ -159,17 +200,14 @@ describe("Analytics Plugin", () => { await handler(mockReq, mockRes); - // Verify service workspace client is used expect(capturedWorkspaceClient).toBeDefined(); - // Verify executeStatement is called with correct statement - expect(executeMock).toHaveBeenCalledWith( + expect(submitMock).toHaveBeenCalledWith( expect.anything(), expect.objectContaining({ statement: "SELECT * FROM test", warehouse_id: "test-warehouse-id", }), - expect.any(AbortSignal), ); expect(mockRes.setHeader).toHaveBeenCalledWith( @@ -182,7 +220,8 @@ describe("Analytics Plugin", () => { ); expect(mockRes.setHeader).toHaveBeenCalledWith("X-Accel-Buffering", "no"); - expect(mockRes.write).toHaveBeenCalledWith("event: result\n"); + // Bridge emits `event: data` with `{ type, ...flat }` (see `_emitDataFrame`). + expect(mockRes.write).toHaveBeenCalledWith("event: data\n"); expect(mockRes.write).toHaveBeenCalledWith( expect.stringContaining('"data":[{"id":1,"name":"test"}]'), ); @@ -191,30 +230,31 @@ describe("Analytics Plugin", () => { }); test("/query/:query_key should execute as user for .obo.sql files (isAsUser: true)", async () => { - const plugin = new AnalyticsPlugin(config); + const plugin = await makeReadyPlugin(); const { router, getHandler } = createMockRouter(); - // Mock getAppQuery to return an .obo.sql file (isAsUser: true) (plugin as any).app.getAppQuery = vi.fn().mockResolvedValue({ query: "SELECT * FROM users WHERE id = :user_id", isAsUser: true, }); let capturedWorkspaceClient: any; - const executeMock = vi + const submitMock = vi .fn() .mockImplementation((workspaceClient, ..._args: any[]) => { capturedWorkspaceClient = workspaceClient; - return Promise.resolve({ - result: { data: [{ user_id: 123, name: "Alice" }] }, - }); + return Promise.resolve( + makeSucceededSubmission( + [[123, "Alice"]], + [{ name: "user_id", type_name: "BIGINT" }, { name: "name" }], + ), + ); }); - (plugin as any).SQLClient.executeStatement = executeMock; + (plugin as any).SQLClient.submitStatement = submitMock; plugin.injectRoutes(router); const handler = getHandler("POST", "/query/:query_key"); - // Request with user headers for .obo.sql queries const mockReq = createMockRequest({ params: { query_key: "user_profile" }, body: { parameters: { user_id: sql.number(123) } }, @@ -227,17 +267,14 @@ describe("Analytics Plugin", () => { await handler(mockReq, mockRes); - // Verify a workspace client is used expect(capturedWorkspaceClient).toBeDefined(); - // Verify the query is executed with correct statement - expect(executeMock).toHaveBeenCalledWith( + expect(submitMock).toHaveBeenCalledWith( expect.anything(), expect.objectContaining({ statement: "SELECT * FROM users WHERE id = :user_id", warehouse_id: "test-warehouse-id", }), - expect.any(AbortSignal), ); expect(mockRes.setHeader).toHaveBeenCalledWith( @@ -245,7 +282,7 @@ describe("Analytics Plugin", () => { "text/event-stream; charset=utf-8", ); - expect(mockRes.write).toHaveBeenCalledWith("event: result\n"); + expect(mockRes.write).toHaveBeenCalledWith("event: data\n"); expect(mockRes.write).toHaveBeenCalledWith( expect.stringContaining('"user_id":123'), ); @@ -253,22 +290,21 @@ describe("Analytics Plugin", () => { expect(mockRes.end).toHaveBeenCalled(); }); - test("should use different cache keys for .sql vs .obo.sql queries", async () => { - const plugin = new AnalyticsPlugin(config); + test("should use different idempotency keys for .sql vs .obo.sql queries", async () => { + const plugin = await makeReadyPlugin(); const { router, getHandler } = createMockRouter(); const getAppQueryMock = vi.fn(); (plugin as any).app.getAppQuery = getAppQueryMock; - const executeMock = vi.fn().mockResolvedValue({ - result: { data: [{ id: 1 }] }, - }); - (plugin as any).SQLClient.executeStatement = executeMock; + const submitMock = vi + .fn() + .mockResolvedValue(makeSucceededSubmission([[1]], [{ name: "id" }])); + (plugin as any).SQLClient.submitStatement = submitMock; plugin.injectRoutes(router); const handler = getHandler("POST", "/query/:query_key"); - // First request: .sql file (isAsUser: false) getAppQueryMock.mockResolvedValueOnce({ query: "SELECT 1", isAsUser: false, @@ -281,7 +317,6 @@ describe("Analytics Plugin", () => { const mockRes1 = createMockResponse(); await handler(mockReq1, mockRes1); - // Second request: .obo.sql file (isAsUser: true) getAppQueryMock.mockResolvedValueOnce({ query: "SELECT 1", isAsUser: true, @@ -298,12 +333,12 @@ describe("Analytics Plugin", () => { const mockRes2 = createMockResponse(); await handler(mockReq2, mockRes2); - // Both should execute (different cache keys due to isAsUser) - expect(executeMock).toHaveBeenCalledTimes(2); + // SP and OBO contribute distinct `executorKey` + `isAsUser` → distinct IK → both submit. + expect(submitMock).toHaveBeenCalledTimes(2); }); - test("should return cached result on second request for .sql files", async () => { - const plugin = new AnalyticsPlugin(config); + test("identical .sql requests return identical data on every call", async () => { + const plugin = await makeReadyPlugin(); const { router, getHandler } = createMockRouter(); (plugin as any).app.getAppQuery = vi.fn().mockResolvedValue({ @@ -311,10 +346,15 @@ describe("Analytics Plugin", () => { isAsUser: false, }); - const executeMock = vi.fn().mockResolvedValue({ - result: { data: [{ id: 1, name: "cached" }] }, - }); - (plugin as any).SQLClient.executeStatement = executeMock; + const submitMock = vi + .fn() + .mockResolvedValue( + makeSucceededSubmission( + [[1, "cached"]], + [{ name: "id" }, { name: "name" }], + ), + ); + (plugin as any).SQLClient.submitStatement = submitMock; plugin.injectRoutes(router); @@ -330,32 +370,41 @@ describe("Analytics Plugin", () => { const mockRes2 = createMockResponse(); await handler(mockReq, mockRes2); - expect(executeMock).toHaveBeenCalledTimes(1); - - expect(mockRes1.write).toHaveBeenCalledWith("event: result\n"); - expect(mockRes2.write).toHaveBeenCalledWith("event: result\n"); + // We don't assert `submitMock.callCount` — `at_least_once` dedupes + // in-flight only, so terminal-state re-execution is legitimate. + expect(mockRes1.write).toHaveBeenCalledWith("event: data\n"); + expect(mockRes2.write).toHaveBeenCalledWith("event: data\n"); + expect(mockRes1.write).toHaveBeenCalledWith( + expect.stringContaining('"data":[{"id":1,"name":"cached"}]'), + ); + expect(mockRes2.write).toHaveBeenCalledWith( + expect.stringContaining('"data":[{"id":1,"name":"cached"}]'), + ); }); - test("should share cache across users for .sql files (global cache)", async () => { - const plugin = new AnalyticsPlugin(config); + test(".sql requests use a shared idempotency key across users", async () => { + const plugin = await makeReadyPlugin(); const { router, getHandler } = createMockRouter(); - // Mock returns .sql file (isAsUser: false) - should use global cache (plugin as any).app.getAppQuery = vi.fn().mockResolvedValue({ query: "SELECT * FROM shared_data", isAsUser: false, }); - const executeMock = vi.fn().mockResolvedValue({ - result: { data: [{ id: 1, name: "shared" }] }, - }); - (plugin as any).SQLClient.executeStatement = executeMock; + const submitMock = vi + .fn() + .mockResolvedValue( + makeSucceededSubmission( + [[1, "shared"]], + [{ name: "id" }, { name: "name" }], + ), + ); + (plugin as any).SQLClient.submitStatement = submitMock; plugin.injectRoutes(router); const handler = getHandler("POST", "/query/:query_key"); - // User 1's request const mockReq1 = createMockRequest({ params: { query_key: "shared_query" }, body: { parameters: {} }, @@ -367,7 +416,6 @@ describe("Analytics Plugin", () => { const mockRes1 = createMockResponse(); await handler(mockReq1, mockRes1); - // User 2's request - different user, but should use shared cache const mockReq2 = createMockRequest({ params: { query_key: "shared_query" }, body: { parameters: {} }, @@ -379,7 +427,6 @@ describe("Analytics Plugin", () => { const mockRes2 = createMockResponse(); await handler(mockReq2, mockRes2); - // User 3's request - also should use shared cache const mockReq3 = createMockRequest({ params: { query_key: "shared_query" }, body: { parameters: {} }, @@ -391,10 +438,15 @@ describe("Analytics Plugin", () => { const mockRes3 = createMockResponse(); await handler(mockReq3, mockRes3); - // Only 1 execution - cache is shared across all users for .sql files - expect(executeMock).toHaveBeenCalledTimes(1); + // SP queries share `executorKey: "global"` across users → shared IK. + const startCalls = (taskStub.start as ReturnType).mock + .calls; + expect(startCalls).toHaveLength(3); + const iks = startCalls.map( + (c) => (c[1] as { executorKey: string }).executorKey, + ); + expect(iks).toEqual(["global", "global", "global"]); - // All users get the same cached result expect(mockRes1.write).toHaveBeenCalledWith( expect.stringContaining('"name":"shared"'), ); @@ -406,31 +458,35 @@ describe("Analytics Plugin", () => { ); }); - test("should cache user-scoped .obo.sql queries separately per user", async () => { - const plugin = new AnalyticsPlugin(config); + test(".obo.sql queries get per-user idempotency keys", async () => { + const plugin = await makeReadyPlugin(); const { router, getHandler } = createMockRouter(); - // Mock returns .obo.sql file (isAsUser: true) (plugin as any).app.getAppQuery = vi.fn().mockResolvedValue({ query: "SELECT * FROM users WHERE id = :user_id", isAsUser: true, }); - const executeMock = vi + const submitMock = vi .fn() - .mockResolvedValueOnce({ - result: { data: [{ user_id: 1, name: "Alice" }] }, - }) - .mockResolvedValueOnce({ - result: { data: [{ user_id: 2, name: "Bob" }] }, - }); - (plugin as any).SQLClient.executeStatement = executeMock; + .mockResolvedValueOnce( + makeSucceededSubmission( + [[1, "Alice"]], + [{ name: "user_id", type_name: "BIGINT" }, { name: "name" }], + ), + ) + .mockResolvedValueOnce( + makeSucceededSubmission( + [[2, "Bob"]], + [{ name: "user_id", type_name: "BIGINT" }, { name: "name" }], + ), + ); + (plugin as any).SQLClient.submitStatement = submitMock; plugin.injectRoutes(router); const handler = getHandler("POST", "/query/:query_key"); - // User 1's request const mockReq1 = createMockRequest({ params: { query_key: "user_profile" }, body: { parameters: { user_id: sql.number(1) } }, @@ -442,7 +498,6 @@ describe("Analytics Plugin", () => { const mockRes1 = createMockResponse(); await handler(mockReq1, mockRes1); - // User 2's request - different user, should not use cache const mockReq2 = createMockRequest({ params: { query_key: "user_profile" }, body: { parameters: { user_id: sql.number(2) } }, @@ -454,34 +509,27 @@ describe("Analytics Plugin", () => { const mockRes2 = createMockResponse(); await handler(mockReq2, mockRes2); - // User 1's request again - should use cache - const mockReq1Again = createMockRequest({ - params: { query_key: "user_profile" }, - body: { parameters: { user_id: sql.number(1) } }, - headers: { - "x-forwarded-access-token": "user-token-1", - "x-forwarded-user": "user-1", - }, - }); - const mockRes1Again = createMockResponse(); - await handler(mockReq1Again, mockRes1Again); + // Distinct OBO callers → distinct executorKeys → both submit. + expect(submitMock).toHaveBeenCalledTimes(2); - expect(executeMock).toHaveBeenCalledTimes(2); + const startCalls = (taskStub.start as ReturnType).mock + .calls; + expect(startCalls).toHaveLength(2); + const executorKeys = startCalls.map( + (c) => (c[1] as { executorKey: string }).executorKey, + ); + expect(new Set(executorKeys).size).toBe(2); expect(mockRes1.write).toHaveBeenCalledWith( expect.stringContaining('"name":"Alice"'), ); - expect(mockRes1Again.write).toHaveBeenCalledWith( - expect.stringContaining('"name":"Alice"'), - ); - expect(mockRes2.write).toHaveBeenCalledWith( expect.stringContaining('"name":"Bob"'), ); }); - test("OBO cache key must use the end user's ID, not the service principal's", async () => { - const plugin = new AnalyticsPlugin(config); + test("OBO IK must include the end user's ID, not the service principal's", async () => { + const plugin = await makeReadyPlugin(); const { router, getHandler } = createMockRouter(); (plugin as any).app.getAppQuery = vi.fn().mockResolvedValue({ @@ -489,20 +537,19 @@ describe("Analytics Plugin", () => { isAsUser: true, }); - const executeMock = vi + const submitMock = vi .fn() - .mockResolvedValueOnce({ - result: { data: [{ owner: "alice-data" }] }, - }) - .mockResolvedValueOnce({ - result: { data: [{ owner: "bob-data" }] }, - }); - (plugin as any).SQLClient.executeStatement = executeMock; + .mockResolvedValueOnce( + makeSucceededSubmission([["alice-data"]], [{ name: "owner" }]), + ) + .mockResolvedValueOnce( + makeSucceededSubmission([["bob-data"]], [{ name: "owner" }]), + ); + (plugin as any).SQLClient.submitStatement = submitMock; plugin.injectRoutes(router); const handler = getHandler("POST", "/query/:query_key"); - // User Alice makes an OBO query const aliceReq = createMockRequest({ params: { query_key: "my_data" }, body: { parameters: {} }, @@ -514,7 +561,6 @@ describe("Analytics Plugin", () => { const aliceRes = createMockResponse(); await handler(aliceReq, aliceRes); - // User Bob makes the SAME OBO query with the SAME parameters const bobReq = createMockRequest({ params: { query_key: "my_data" }, body: { parameters: {} }, @@ -526,21 +572,27 @@ describe("Analytics Plugin", () => { const bobRes = createMockResponse(); await handler(bobReq, bobRes); - // Both queries must execute — different users must not share OBO cache - expect(executeMock).toHaveBeenCalledTimes(2); + // `executorKey` resolves to the OBO end user's id, not the SP — distinct IKs. + expect(submitMock).toHaveBeenCalledTimes(2); + + const startCalls = (taskStub.start as ReturnType).mock + .calls; + const executorKeys = startCalls.map( + (c) => (c[1] as { executorKey: string }).executorKey, + ); + expect(executorKeys).toContain("alice"); + expect(executorKeys).toContain("bob"); - // Alice sees her own data expect(aliceRes.write).toHaveBeenCalledWith( expect.stringContaining('"owner":"alice-data"'), ); - // Bob sees his own data, NOT Alice's cached result expect(bobRes.write).toHaveBeenCalledWith( expect.stringContaining('"owner":"bob-data"'), ); }); - test("should handle AbortSignal cancellation", async () => { - const plugin = new AnalyticsPlugin(config); + test("submitStatement is called with the correct request body", async () => { + const plugin = await makeReadyPlugin(); const { router, getHandler } = createMockRouter(); (plugin as any).app.getAppQuery = vi.fn().mockResolvedValue({ @@ -548,16 +600,10 @@ describe("Analytics Plugin", () => { isAsUser: false, }); - const executeMock = vi + const submitMock = vi .fn() - .mockImplementation( - async (_workspaceClient: any, _params: any, signal: AbortSignal) => { - expect(signal).toBeDefined(); - expect(signal).toBeInstanceOf(AbortSignal); - return { result: { data: [{ id: 1 }] } }; - }, - ); - (plugin as any).SQLClient.executeStatement = executeMock; + .mockResolvedValue(makeSucceededSubmission([[1]], [{ name: "id" }])); + (plugin as any).SQLClient.submitStatement = submitMock; plugin.injectRoutes(router); @@ -570,14 +616,16 @@ describe("Analytics Plugin", () => { await handler(mockReq, mockRes); - expect(executeMock).toHaveBeenCalledWith( + // Durable path routes through `submitStatement` (so statement_id + // can be checkpointed) and skips the AbortSignal — cancellation + // is cooperative via `this.task.stop`. + expect(submitMock).toHaveBeenCalledWith( expect.anything(), expect.objectContaining({ statement: "SELECT * FROM test", parameters: [], warehouse_id: "test-warehouse-id", }), - expect.any(AbortSignal), ); }); diff --git a/packages/appkit/src/plugins/analytics/types.ts b/packages/appkit/src/plugins/analytics/types.ts index c58b6ecfe..7dd29549f 100644 --- a/packages/appkit/src/plugins/analytics/types.ts +++ b/packages/appkit/src/plugins/analytics/types.ts @@ -8,6 +8,13 @@ export type AnalyticsFormat = "JSON" | "ARROW"; export interface IAnalyticsQueryRequest { parameters?: Record; format?: AnalyticsFormat; + /** + * Opt out of TaskFlow for this single call. Wire shape is unchanged + * (`{ type, ...flat }` either way). No dedup, no recovery, no + * cooperative stop. Useful for sub-500ms hot paths where WAL + + * spawn overhead dominates. Defaults to `false`. + */ + direct?: boolean; } export interface AnalyticsQueryResponse { diff --git a/packages/appkit/src/tasks/index.ts b/packages/appkit/src/tasks/index.ts index 5ce8ad07c..79f54c169 100644 --- a/packages/appkit/src/tasks/index.ts +++ b/packages/appkit/src/tasks/index.ts @@ -28,3 +28,4 @@ export type { TaskRef, TypedTaskContext, } from "./types"; +export { userContextFromTaskCtx } from "./user-context"; diff --git a/packages/appkit/src/tasks/user-context.ts b/packages/appkit/src/tasks/user-context.ts new file mode 100644 index 000000000..614559814 --- /dev/null +++ b/packages/appkit/src/tasks/user-context.ts @@ -0,0 +1,35 @@ +import type { TaskContext } from "../../vendor/taskflow/taskflow.js"; +import type { UserContext } from "../context"; + +/** + * Reads the OBO `UserContext` that AppKit forwards through + * `ctx.context` on `executeTask`. Returns `null` for SP runs, recovery + * without an explicit `{ context }`, or any payload that fails the + * `isUserContext` discriminator (defends against stale / wrong-shape + * sidecars). + * + * @example + * ```ts + * const userCtx = userContextFromTaskCtx(ctx); + * if (userCtx) return runInUserContext(userCtx, () => doWork(input)); + * return doWork(input); + * ``` + * + * @public + */ +export function userContextFromTaskCtx( + ctx: Pick, +): UserContext | null { + const value = ctx.context as unknown; + if ( + value !== null && + typeof value === "object" && + "isUserContext" in value && + (value as { isUserContext?: unknown }).isUserContext === true && + "userId" in value && + typeof (value as { userId?: unknown }).userId === "string" + ) { + return value as UserContext; + } + return null; +}