diff --git a/docs/docs/api/appkit/Class.ExecutionError.md b/docs/docs/api/appkit/Class.ExecutionError.md index 75886c4dc..eacfdc23d 100644 --- a/docs/docs/api/appkit/Class.ExecutionError.md +++ b/docs/docs/api/appkit/Class.ExecutionError.md @@ -22,6 +22,7 @@ throw new ExecutionError("Statement was canceled"); new ExecutionError(message: string, options?: { cause?: Error; context?: Record; + errorCode?: string; }): ExecutionError; ``` @@ -30,15 +31,16 @@ new ExecutionError(message: string, options?: { | Parameter | Type | | ------ | ------ | | `message` | `string` | -| `options?` | \{ `cause?`: `Error`; `context?`: `Record`\<`string`, `unknown`\>; \} | +| `options?` | \{ `cause?`: `Error`; `context?`: `Record`\<`string`, `unknown`\>; `errorCode?`: `string`; \} | | `options.cause?` | `Error` | | `options.context?` | `Record`\<`string`, `unknown`\> | +| `options.errorCode?` | `string` | #### Returns `ExecutionError` -#### Inherited from +#### Overrides [`AppKitError`](Class.AppKitError.md).[`constructor`](Class.AppKitError.md#constructor) @@ -86,6 +88,19 @@ Additional context for the error *** +### errorCode? + +```ts +readonly optional errorCode: string; +``` + +Structured error code from the upstream source (typically the warehouse's +`error_code` for statement-level failures, or the SDK's `ApiError.errorCode` +for HTTP failures). Preserved through wrapping so callers can branch on a +stable identifier without substring-matching the message. + +*** + ### isRetryable ```ts @@ -202,16 +217,17 @@ Create an execution error for closed/expired results ### statementFailed() ```ts -static statementFailed(errorMessage?: string): ExecutionError; +static statementFailed(errorMessage?: string, errorCode?: string): ExecutionError; ``` -Create an execution error for statement failure +Create an execution error for statement failure. #### Parameters -| Parameter | Type | -| ------ | ------ | -| `errorMessage?` | `string` | +| Parameter | Type | Description | +| ------ | ------ | ------ | +| `errorMessage?` | `string` | Human-readable error from the warehouse / SDK. | +| `errorCode?` | `string` | Structured code (e.g. "INVALID_PARAMETER_VALUE") to preserve through wrapping. Optional. | #### Returns diff --git a/packages/appkit-ui/src/js/sse/connect-sse.ts b/packages/appkit-ui/src/js/sse/connect-sse.ts index c4fd4500d..13d9053de 100644 --- a/packages/appkit-ui/src/js/sse/connect-sse.ts +++ b/packages/appkit-ui/src/js/sse/connect-sse.ts @@ -18,7 +18,11 @@ export async function connectSSE( lastEventId: initialLastEventId = null, retryDelay = 2000, maxRetries = 3, - maxBufferSize = 1024 * 1024, // 1MB + // 1 MiB — matches the server's `streamDefaults.maxEventSize`. SSE + // carries only short JSON control messages; bulk Arrow payloads flow + // over plain HTTP via `/api/analytics/arrow-result/:jobId`, so this + // buffer never needs to hold multi-MiB attachments. + maxBufferSize = 1 * 1024 * 1024, timeout = 300000, // 5 minutes onError, } = options; diff --git a/packages/appkit-ui/src/react/charts/__tests__/types.test.ts b/packages/appkit-ui/src/react/charts/__tests__/types.test.ts index 13394dcf6..d6685ce01 100644 --- a/packages/appkit-ui/src/react/charts/__tests__/types.test.ts +++ b/packages/appkit-ui/src/react/charts/__tests__/types.test.ts @@ -93,7 +93,7 @@ describe("isQueryProps", () => { const props = { queryKey: "test_query", parameters: { limit: 100 }, - format: "json" as const, + format: "json_array" as const, }; expect(isQueryProps(props as any)).toBe(true); diff --git a/packages/appkit-ui/src/react/charts/types.ts b/packages/appkit-ui/src/react/charts/types.ts index 65804a741..ef738aad2 100644 --- a/packages/appkit-ui/src/react/charts/types.ts +++ b/packages/appkit-ui/src/react/charts/types.ts @@ -4,8 +4,22 @@ import type { Table } from "apache-arrow"; // Data Format Types // ============================================================================ -/** Supported data formats for analytics queries */ -export type DataFormat = "json" | "arrow" | "auto"; +/** + * Supported data formats for analytics queries. + * + * "json" and "arrow" are legacy aliases kept for backwards compatibility + * with appkit-ui < 0.33.0 — safe to remove once no consumer is on a + * pre-0.33.0 version. resolveFormat() normalizes them to their canonical + * equivalents before any downstream code reads the value. + */ +export type DataFormat = + | "json_array" + | "arrow_stream" + | "auto" + /** @deprecated Use "json_array". Safe to remove once no consumer is on appkit-ui < 0.33.0. */ + | "json" + /** @deprecated Use "arrow_stream". Safe to remove once no consumer is on appkit-ui < 0.33.0. */ + | "arrow"; /** Chart orientation */ export type Orientation = "vertical" | "horizontal"; @@ -77,8 +91,8 @@ export interface QueryProps extends ChartBaseProps { parameters?: Record; /** * Data format to use - * - "json": Use JSON format (smaller payloads, simpler) - * - "arrow": Use Arrow format (faster for large datasets) + * - "json_array": Use JSON format (smaller payloads, simpler) + * - "arrow_stream": Use Arrow format (faster for large datasets) * - "auto": Automatically select based on expected data size * @default "auto" */ diff --git a/packages/appkit-ui/src/react/hooks/__tests__/use-analytics-query.test.ts b/packages/appkit-ui/src/react/hooks/__tests__/use-analytics-query.test.ts new file mode 100644 index 000000000..1e748346c --- /dev/null +++ b/packages/appkit-ui/src/react/hooks/__tests__/use-analytics-query.test.ts @@ -0,0 +1,172 @@ +import { renderHook, waitFor } from "@testing-library/react"; +import { beforeEach, describe, expect, test, vi } from "vitest"; + +// Capture the onMessage handler so tests can drive SSE messages directly. +let lastConnectArgs: any = null; +const mockProcessArrowBuffer = vi.fn(); +const mockFetchArrow = vi.fn(); + +vi.mock("@/js", () => ({ + connectSSE: vi.fn((args: any) => { + lastConnectArgs = args; + return () => {}; + }), + ArrowClient: { + fetchArrow: (...args: unknown[]) => mockFetchArrow(...args), + processArrowBuffer: (...args: unknown[]) => mockProcessArrowBuffer(...args), + }, +})); + +// useQueryHMR is a no-op shim for tests; mock to avoid HMR side effects. +vi.mock("../use-query-hmr", () => ({ + useQueryHMR: vi.fn(), +})); + +import { useAnalyticsQuery } from "../use-analytics-query"; + +describe("useAnalyticsQuery", () => { + beforeEach(() => { + vi.clearAllMocks(); + lastConnectArgs = null; + }); + + test("fetches an arrow message (warehouse statement id) via /arrow-result", async () => { + const fakeTable = { numRows: 1, schema: { fields: [] } }; + const fakeBytes = new Uint8Array([1, 2, 3]); + mockFetchArrow.mockResolvedValueOnce(fakeBytes); + mockProcessArrowBuffer.mockResolvedValueOnce(fakeTable); + + const { result } = renderHook(() => + useAnalyticsQuery("q", null, { format: "ARROW_STREAM" }), + ); + + await lastConnectArgs.onMessage({ + data: JSON.stringify({ type: "arrow", statement_id: "stmt-warehouse-1" }), + }); + + await waitFor(() => { + expect(result.current.data).toBe(fakeTable); + }); + + expect(mockFetchArrow).toHaveBeenCalledTimes(1); + expect(mockFetchArrow).toHaveBeenCalledWith( + "/api/analytics/arrow-result/stmt-warehouse-1", + ); + expect(mockProcessArrowBuffer).toHaveBeenCalledWith(fakeBytes); + }); + + test("fetches an arrow message with synthetic inline- id through the same /arrow-result path", async () => { + // The client must treat inline and external-links responses uniformly — + // it never decodes base64 locally. The /arrow-result route on the + // server is the only place that knows which path the bytes came from. + const fakeTable = { numRows: 1, schema: { fields: [] } }; + const fakeBytes = new Uint8Array([1, 2, 3, 4, 5]); + mockFetchArrow.mockResolvedValueOnce(fakeBytes); + mockProcessArrowBuffer.mockResolvedValueOnce(fakeTable); + + const { result } = renderHook(() => + useAnalyticsQuery("q", null, { format: "ARROW_STREAM" }), + ); + + await lastConnectArgs.onMessage({ + data: JSON.stringify({ + type: "arrow", + statement_id: "inline-abc-xyz", + }), + }); + + await waitFor(() => { + expect(result.current.data).toBe(fakeTable); + }); + + expect(mockFetchArrow).toHaveBeenCalledTimes(1); + expect(mockFetchArrow).toHaveBeenCalledWith( + "/api/analytics/arrow-result/inline-abc-xyz", + ); + }); + + test("surfaces an error when the arrow fetch fails", async () => { + mockFetchArrow.mockRejectedValueOnce(new Error("network")); + + const { result } = renderHook(() => + useAnalyticsQuery("q", null, { format: "ARROW_STREAM" }), + ); + + await lastConnectArgs.onMessage({ + data: JSON.stringify({ type: "arrow", statement_id: "stmt-1" }), + }); + + await waitFor(() => { + expect(result.current.error).toBe( + "Unable to load data, please try again", + ); + }); + expect(result.current.loading).toBe(false); + }); + + test("rejects the retired arrow_inline message type as schema-invalid", async () => { + // arrow_inline was the prior wire shape. The discriminated union no + // longer accepts it, so it falls through to the generic error/code + // branch — but critically, it must NEVER trigger ArrowClient calls. + const { result } = renderHook(() => + useAnalyticsQuery("q", null, { format: "ARROW_STREAM" }), + ); + + await lastConnectArgs.onMessage({ + data: JSON.stringify({ type: "arrow_inline", attachment: "AQID" }), + }); + + // Whatever the hook surfaces (error or noop), it must not have tried to + // decode the payload locally. + await waitFor(() => { + // Either an error is set or loading completed without data — both are + // acceptable, but processArrowBuffer must never run on a base64 input. + expect( + result.current.loading || + result.current.error || + result.current.data === null, + ).toBeTruthy(); + }); + expect(mockProcessArrowBuffer).not.toHaveBeenCalled(); + expect(mockFetchArrow).not.toHaveBeenCalled(); + }); + + test("normalizes an empty result message (no data field) to []", async () => { + // The wire schema makes `data` optional — empty result sets may omit + // it. The hook must surface that as an explicit empty array rather + // than `undefined`, so callers can rely on `data` being either null + // (no message yet) or a value of the inferred result type. + const { result } = renderHook(() => + useAnalyticsQuery("q", null, { format: "JSON_ARRAY" }), + ); + + await lastConnectArgs.onMessage({ + data: JSON.stringify({ type: "result" }), + }); + + await waitFor(() => { + expect(result.current.data).toEqual([]); + }); + expect(result.current.loading).toBe(false); + expect(result.current.error).toBeNull(); + }); + + test("still handles type:result rows for JSON_ARRAY", async () => { + const { result } = renderHook(() => + useAnalyticsQuery("q", null, { format: "JSON_ARRAY" }), + ); + + await lastConnectArgs.onMessage({ + data: JSON.stringify({ + type: "result", + data: [{ id: 1 }, { id: 2 }], + }), + }); + + await waitFor(() => { + expect(result.current.data).toEqual([{ id: 1 }, { id: 2 }]); + }); + expect(mockProcessArrowBuffer).not.toHaveBeenCalled(); + expect(mockFetchArrow).not.toHaveBeenCalled(); + }); +}); diff --git a/packages/appkit-ui/src/react/hooks/__tests__/use-chart-data.test.ts b/packages/appkit-ui/src/react/hooks/__tests__/use-chart-data.test.ts index a4d99a916..686aff317 100644 --- a/packages/appkit-ui/src/react/hooks/__tests__/use-chart-data.test.ts +++ b/packages/appkit-ui/src/react/hooks/__tests__/use-chart-data.test.ts @@ -72,7 +72,7 @@ describe("useChartData", () => { }); describe("format selection", () => { - test("uses JSON format when explicitly specified", () => { + test("uses JSON_ARRAY format when explicitly specified", () => { mockUseAnalyticsQuery.mockReturnValue({ data: [], loading: false, @@ -82,7 +82,7 @@ describe("useChartData", () => { renderHook(() => useChartData({ queryKey: "test", - format: "json", + format: "json_array", }), ); @@ -93,7 +93,7 @@ describe("useChartData", () => { ); }); - test("uses ARROW format when explicitly specified", () => { + test("uses ARROW_STREAM format when explicitly specified", () => { mockUseAnalyticsQuery.mockReturnValue({ data: [], loading: false, @@ -103,7 +103,7 @@ describe("useChartData", () => { renderHook(() => useChartData({ queryKey: "test", - format: "arrow", + format: "arrow_stream", }), ); @@ -114,7 +114,7 @@ describe("useChartData", () => { ); }); - test("auto-selects ARROW for large limit", () => { + test("auto-selects ARROW_STREAM for large limit", () => { mockUseAnalyticsQuery.mockReturnValue({ data: [], loading: false, @@ -136,7 +136,7 @@ describe("useChartData", () => { ); }); - test("auto-selects ARROW for date range queries", () => { + test("auto-selects ARROW_STREAM for date range queries", () => { mockUseAnalyticsQuery.mockReturnValue({ data: [], loading: false, @@ -205,7 +205,7 @@ describe("useChartData", () => { ); }); - test("auto-selects JSON by default when no heuristics match", () => { + test("auto-selects JSON_ARRAY by default when no heuristics match", () => { mockUseAnalyticsQuery.mockReturnValue({ data: [], loading: false, @@ -227,7 +227,7 @@ describe("useChartData", () => { ); }); - test("defaults to auto format (JSON) when format is not specified", () => { + test("defaults to auto format (JSON_ARRAY) when format is not specified", () => { mockUseAnalyticsQuery.mockReturnValue({ data: [], loading: false, @@ -353,7 +353,7 @@ describe("useChartData", () => { expect(result.current.isArrow).toBe(false); }); - test("isArrow reflects requested ARROW format when data is null", () => { + test("isArrow reflects requested ARROW_STREAM format when data is null", () => { mockUseAnalyticsQuery.mockReturnValue({ data: null, loading: true, @@ -361,13 +361,13 @@ describe("useChartData", () => { }); const { result } = renderHook(() => - useChartData({ queryKey: "test", format: "arrow" }), + useChartData({ queryKey: "test", format: "arrow_stream" }), ); expect(result.current.isArrow).toBe(true); }); - test("isArrow reflects requested JSON format when data is null", () => { + test("isArrow reflects requested JSON_ARRAY format when data is null", () => { mockUseAnalyticsQuery.mockReturnValue({ data: null, loading: true, @@ -375,7 +375,7 @@ describe("useChartData", () => { }); const { result } = renderHook(() => - useChartData({ queryKey: "test", format: "json" }), + useChartData({ queryKey: "test", format: "json_array" }), ); expect(result.current.isArrow).toBe(false); diff --git a/packages/appkit-ui/src/react/hooks/types.ts b/packages/appkit-ui/src/react/hooks/types.ts index 60fc5f63b..7c249bf0f 100644 --- a/packages/appkit-ui/src/react/hooks/types.ts +++ b/packages/appkit-ui/src/react/hooks/types.ts @@ -47,7 +47,7 @@ export interface TypedArrowTable< export interface UseAnalyticsQueryOptions< F extends AnalyticsFormat = "JSON_ARRAY", > { - /** Response format - "JSON_ARRAY" returns typed arrays, "ARROW_STREAM" returns TypedArrowTable */ + /** Response format - "JSON_ARRAY" (default) returns typed arrays, "ARROW_STREAM" uses Arrow (inline or external links) */ format?: F; /** Maximum size of serialized parameters in bytes */ diff --git a/packages/appkit-ui/src/react/hooks/use-analytics-query.ts b/packages/appkit-ui/src/react/hooks/use-analytics-query.ts index 0bd0b2f02..88419313f 100644 --- a/packages/appkit-ui/src/react/hooks/use-analytics-query.ts +++ b/packages/appkit-ui/src/react/hooks/use-analytics-query.ts @@ -1,4 +1,5 @@ import { useCallback, useEffect, useMemo, useRef, useState } from "react"; +import { AnalyticsSseMessage } from "shared"; import { ArrowClient, connectSSE } from "@/js"; import type { AnalyticsFormat, @@ -39,13 +40,13 @@ function getArrowStreamUrl(id: string) { * @param options - Analytics query settings including format * @returns Query result state with format-appropriate data type * - * @example JSON format (default) + * @example JSON_ARRAY format (default) * ```typescript * const { data } = useAnalyticsQuery("spend_data", params); * // data: Array<{ group_key: string; cost_usd: number; ... }> | null * ``` * - * @example Arrow format + * @example ARROW_STREAM format * ```typescript * const { data } = useAnalyticsQuery("spend_data", params, { format: "ARROW_STREAM" }); * // data: TypedArrowTable<{ group_key: string; cost_usd: number; ... }> | null @@ -120,20 +121,35 @@ export function useAnalyticsQuery< signal: abortController.signal, onMessage: async (message) => { try { - const parsed = JSON.parse(message.data); - - // success - JSON format - if (parsed.type === "result") { + const rawParsed = JSON.parse(message.data); + + // The error/code branch below predates the SSE wire schema and + // can fire for messages that don't match any AnalyticsSseMessage + // variant (e.g. server-side error events from executeStream). + // Try schema validation first; if it fails, fall through to the + // generic error/code handling below. + const validated = AnalyticsSseMessage.safeParse(rawParsed); + const msg = validated.success ? validated.data : null; + + // success - JSON format. The wire schema makes `data` optional + // (e.g. an empty result set may omit it), so normalize the + // missing case to an explicit empty array rather than letting + // `undefined` bleed into the hook's `T | null` state. + if (msg?.type === "result") { setLoading(false); - setData(parsed.data as ResultType); + setData((msg.data ?? []) as ResultType); return; } - // success - Arrow format - if (parsed.type === "arrow") { + // success - Arrow format. Both INLINE (server-stashed, + // statement_id prefixed with "inline-") and EXTERNAL_LINKS + // (warehouse statement_id) flow through this single branch — the + // /arrow-result route dispatches based on the id prefix so the + // client doesn't need to know which path the bytes came from. + if (msg?.type === "arrow") { try { const arrowData = await ArrowClient.fetchArrow( - getArrowStreamUrl(parsed.statement_id), + getArrowStreamUrl(msg.statement_id), ); const table = await ArrowClient.processArrowBuffer(arrowData); setLoading(false); @@ -151,6 +167,11 @@ export function useAnalyticsQuery< } } + // The schema didn't match — fall through to error/code handling + // below for legacy error events or surface a malformed-payload + // error if no error fields are present. + const parsed = rawParsed; + // error if (parsed.type === "error" || parsed.error || parsed.code) { const errorMsg = @@ -166,6 +187,18 @@ export function useAnalyticsQuery< } return; } + + // The payload matched neither AnalyticsSseMessage nor an error + // event — surface a generic error rather than silently dropping it. + if (!validated.success) { + console.error( + "[useAnalyticsQuery] Malformed SSE payload", + validated.error.flatten(), + ); + setLoading(false); + setError("Unable to load data, please try again"); + return; + } } catch (error) { console.warn("[useAnalyticsQuery] Malformed message received", error); } diff --git a/packages/appkit-ui/src/react/hooks/use-chart-data.ts b/packages/appkit-ui/src/react/hooks/use-chart-data.ts index a90481a2e..64b6e167f 100644 --- a/packages/appkit-ui/src/react/hooks/use-chart-data.ts +++ b/packages/appkit-ui/src/react/hooks/use-chart-data.ts @@ -17,8 +17,8 @@ export interface UseChartDataOptions { parameters?: Record; /** * Data format preference - * - "json": Force JSON format - * - "arrow": Force Arrow format + * - "json_array": Force JSON format + * - "arrow_stream": Force Arrow format * - "auto": Auto-select based on heuristics * @default "auto" */ @@ -51,9 +51,10 @@ function resolveFormat( format: DataFormat, parameters?: Record, ): "JSON_ARRAY" | "ARROW_STREAM" { - // Explicit format selection - if (format === "json") return "JSON_ARRAY"; - if (format === "arrow") return "ARROW_STREAM"; + // Explicit format selection (legacy "json"/"arrow" accepted for back-compat + // with appkit-ui < 0.33.0 — see DataFormat in ../charts/types.ts). + if (format === "json_array" || format === "json") return "JSON_ARRAY"; + if (format === "arrow_stream" || format === "arrow") return "ARROW_STREAM"; // Auto-selection heuristics if (format === "auto") { @@ -97,7 +98,7 @@ function resolveFormat( * // Force Arrow format * const { data } = useChartData({ * queryKey: "big_query", - * format: "arrow" + * format: "arrow_stream" * }); * ``` */ diff --git a/packages/appkit/package.json b/packages/appkit/package.json index 026065fa6..4846f88a6 100644 --- a/packages/appkit/package.json +++ b/packages/appkit/package.json @@ -73,6 +73,7 @@ "@opentelemetry/sdk-trace-base": "2.6.0", "@opentelemetry/semantic-conventions": "1.38.0", "@types/semver": "7.7.1", + "apache-arrow": "21.1.0", "dotenv": "16.6.1", "express": "4.22.0", "get-port": "7.2.0", diff --git a/packages/appkit/src/connectors/sql-warehouse/arrow-schema.ts b/packages/appkit/src/connectors/sql-warehouse/arrow-schema.ts new file mode 100644 index 000000000..17d099e37 --- /dev/null +++ b/packages/appkit/src/connectors/sql-warehouse/arrow-schema.ts @@ -0,0 +1,441 @@ +import { + Binary, + Bool, + type DataType, + DateDay, + Decimal, + DurationMicrosecond, + Field, + Float32, + Float64, + Int8, + Int16, + Int32, + Int64, + IntervalYearMonth, + List, + Map_, + Null, + Schema, + Struct, + Table, + TimestampMicrosecond, + tableToIPC, + Utf8, +} from "apache-arrow"; + +/** + * Parse a Databricks SQL type text (the value returned by the Statement + * Execution API in `ColumnInfo.type_text`) into an Apache Arrow DataType. + * + * Supports: + * - All scalar types (STRING, INT, BIGINT, DECIMAL, TIMESTAMP, etc.) + * - Parameterized scalars: DECIMAL(p,s), VARCHAR(n), CHAR(n) + * - Nested types: ARRAY, MAP, STRUCT + * - INTERVAL year-month and day-time variants + * - Backtick-quoted struct field names with embedded `` `` `` escapes + * + * Unknown or unparseable types fall back to Utf8 — empty-Table consumers + * still see a column with the right name; only the inner type is degraded. + */ +export function parseDatabricksType(typeText: string): DataType { + const parser = new TypeParser(typeText); + const result = parser.parseType(); + parser.expectEnd(); + return result; +} + +/** + * Build an empty Arrow IPC stream (base64-encoded) matching the column schema + * returned by the warehouse. Used so ARROW_STREAM responses with no rows still + * deliver a real Arrow Table to the client, preserving the hook's typed + * contract. + */ +export function buildEmptyArrowIPCBase64( + columns: Array<{ + name?: string; + type_text?: string; + type_name?: string; + }>, +): string { + const fields = columns.map((col, index) => { + const typeText = col.type_text ?? col.type_name ?? "STRING"; + let dataType: DataType; + try { + dataType = parseDatabricksType(typeText); + } catch { + dataType = new Utf8(); + } + const name = col.name && col.name.length > 0 ? col.name : `column_${index}`; + return new Field(name, dataType, true); + }); + const schema = new Schema(fields); + const table = new Table(schema); + const ipc = tableToIPC(table, "stream"); + return Buffer.from(ipc).toString("base64"); +} + +// ============================================================================ +// Recursive-descent parser +// ============================================================================ + +class TypeParser { + private readonly input: string; + private pos = 0; + + constructor(input: string) { + this.input = input; + } + + parseType(): DataType { + this.skipWs(); + + let name: string; + if (this.peek() === "`") { + name = this.consumeBacktickIdent(); + } else { + name = this.consumeIdent(); + } + const upper = name.toUpperCase(); + + this.skipWs(); + + if (upper === "INTERVAL") { + return this.parseInterval(); + } + + if (this.peek() === "(") { + this.consume("("); + const args = this.parseNumberArgs(); + this.consume(")"); + this.skipWs(); + return this.makeParameterized(upper, args); + } + + if (this.peek() === "<") { + this.consume("<"); + const result = this.makeGeneric(upper); + this.skipWs(); + this.consume(">"); + return result; + } + + return this.makeScalar(upper); + } + + expectEnd(): void { + this.skipWs(); + if (this.pos < this.input.length) { + throw new Error( + `Unexpected trailing input at position ${this.pos}: "${this.input.slice(this.pos)}"`, + ); + } + } + + // ─── Type constructors ─────────────────────────────────── + + private makeScalar(upper: string): DataType { + switch (upper) { + case "STRING": + case "VARIANT": + return new Utf8(); + case "VARCHAR": + case "CHAR": + return new Utf8(); + case "BINARY": + case "GEOGRAPHY": + case "GEOMETRY": + return new Binary(); + case "BOOLEAN": + case "BOOL": + return new Bool(); + case "TINYINT": + case "BYTE": + return new Int8(); + case "SMALLINT": + case "SHORT": + return new Int16(); + case "INT": + case "INTEGER": + return new Int32(); + case "BIGINT": + case "LONG": + return new Int64(); + case "FLOAT": + case "REAL": + return new Float32(); + case "DOUBLE": + return new Float64(); + case "DECIMAL": + case "NUMERIC": + case "DEC": + return new Decimal(0, 10, 128); + case "DATE": + return new DateDay(); + case "TIMESTAMP": + case "TIMESTAMP_LTZ": + return new TimestampMicrosecond("UTC"); + case "TIMESTAMP_NTZ": + return new TimestampMicrosecond(); + case "VOID": + case "NULL": + return new Null(); + default: + return new Utf8(); + } + } + + private makeParameterized(upper: string, args: number[]): DataType { + switch (upper) { + case "DECIMAL": + case "NUMERIC": + case "DEC": { + const precision = args[0] ?? 10; + const scale = args[1] ?? 0; + // Arrow JS Decimal constructor signature is (scale, precision, bitWidth). + return new Decimal(scale, precision, 128); + } + case "VARCHAR": + case "CHAR": + return new Utf8(); + default: + return new Utf8(); + } + } + + private makeGeneric(upper: string): DataType { + switch (upper) { + case "ARRAY": { + const inner = this.parseType(); + return new List(new Field("item", inner, true)); + } + case "MAP": { + const keyType = this.parseType(); + this.skipWs(); + this.consume(","); + this.skipWs(); + const valueType = this.parseType(); + const entriesStruct = new Struct([ + new Field("key", keyType, false), + new Field("value", valueType, true), + ]); + return new Map_(new Field("entries", entriesStruct, false), false); + } + case "STRUCT": + return this.parseStructFields(); + default: + // Unknown generic — skip to matching '>' and fall back. + this.skipBalancedAngles(); + return new Utf8(); + } + } + + private parseStructFields(): DataType { + const fields: Field[] = []; + while (true) { + this.skipWs(); + if (this.peek() === ">") break; + + let name: string; + if (this.peek() === "`") { + name = this.consumeBacktickIdent(); + } else { + name = this.consumeIdent(); + } + + this.skipWs(); + this.consume(":"); + this.skipWs(); + + const type = this.parseType(); + + // Optional `NOT NULL` and `COMMENT '...'`. Both are accepted by + // Databricks DDL and may appear in `type_text`. + this.skipWs(); + while (this.peekKeyword("NOT")) { + this.consumeIdent(); + this.skipWs(); + if (this.peekKeyword("NULL")) { + this.consumeIdent(); + } + this.skipWs(); + } + if (this.peekKeyword("COMMENT")) { + this.consumeIdent(); + this.skipWs(); + this.consumeStringLiteral(); + this.skipWs(); + } + + fields.push(new Field(name, type, true)); + + this.skipWs(); + if (this.peek() === ",") { + this.consume(","); + } else { + break; + } + } + return new Struct(fields); + } + + private parseInterval(): DataType { + // Grammar: INTERVAL [TO ] + // YEAR / MONTH variants -> IntervalYearMonth + // DAY / HOUR / MINUTE / SECOND variants -> Duration(microsecond) + const seen: string[] = []; + while (this.pos < this.input.length) { + this.skipWs(); + const c = this.peek(); + if (c === "" || c === "," || c === ">" || c === ")") break; + const word = this.consumeIdent().toUpperCase(); + seen.push(word); + } + const isYearMonth = seen.some((w) => w === "YEAR" || w === "MONTH"); + return isYearMonth ? new IntervalYearMonth() : new DurationMicrosecond(); + } + + private parseNumberArgs(): number[] { + const args: number[] = []; + while (true) { + this.skipWs(); + if (this.peek() === ")") break; + args.push(this.consumeNumber()); + this.skipWs(); + if (this.peek() === ",") { + this.consume(","); + } else { + break; + } + } + return args; + } + + // ─── Token utilities ───────────────────────────────────── + + private peek(): string { + return this.input[this.pos] ?? ""; + } + + private peekKeyword(word: string): boolean { + const slice = this.input.slice(this.pos, this.pos + word.length); + if (slice.toUpperCase() !== word.toUpperCase()) return false; + // Must be followed by a non-identifier character (boundary check). + const next = this.input[this.pos + word.length] ?? ""; + return !/[A-Za-z0-9_]/.test(next); + } + + private consume(expected: string): void { + if (this.peek() !== expected) { + throw new Error( + `Expected "${expected}" at position ${this.pos}, got "${this.peek()}" in "${this.input}"`, + ); + } + this.pos++; + } + + private skipWs(): void { + while ( + this.pos < this.input.length && + /\s/.test(this.input[this.pos] ?? "") + ) { + this.pos++; + } + } + + private consumeIdent(): string { + const start = this.pos; + while ( + this.pos < this.input.length && + /[A-Za-z0-9_]/.test(this.input[this.pos] ?? "") + ) { + this.pos++; + } + if (this.pos === start) { + throw new Error( + `Expected identifier at position ${this.pos}, got "${this.peek()}" in "${this.input}"`, + ); + } + return this.input.slice(start, this.pos); + } + + private consumeBacktickIdent(): string { + this.consume("`"); + let value = ""; + while (this.pos < this.input.length) { + if (this.input[this.pos] === "`") { + if (this.input[this.pos + 1] === "`") { + value += "`"; + this.pos += 2; + continue; + } + break; + } + value += this.input[this.pos]; + this.pos++; + } + this.consume("`"); + return value; + } + + private consumeNumber(): number { + const start = this.pos; + while ( + this.pos < this.input.length && + /[0-9]/.test(this.input[this.pos] ?? "") + ) { + this.pos++; + } + if (this.pos === start) { + throw new Error( + `Expected number at position ${this.pos}, got "${this.peek()}" in "${this.input}"`, + ); + } + return Number.parseInt(this.input.slice(start, this.pos), 10); + } + + private consumeStringLiteral(): string { + const quote = this.peek(); + if (quote !== "'" && quote !== '"') { + throw new Error( + `Expected string literal at position ${this.pos}, got "${quote}" in "${this.input}"`, + ); + } + this.pos++; + let value = ""; + while (this.pos < this.input.length) { + const c = this.input[this.pos]; + if (c === "\\") { + // Escape sequence: keep the next char verbatim. + const next = this.input[this.pos + 1]; + if (next !== undefined) { + value += next; + this.pos += 2; + continue; + } + this.pos++; + continue; + } + if (c === quote) { + this.pos++; + return value; + } + value += c; + this.pos++; + } + throw new Error(`Unterminated string literal in "${this.input}"`); + } + + private skipBalancedAngles(): void { + let depth = 1; + while (this.pos < this.input.length && depth > 0) { + const c = this.peek(); + if (c === "<") depth++; + else if (c === ">") { + depth--; + if (depth === 0) return; + } + this.pos++; + } + } +} diff --git a/packages/appkit/src/connectors/sql-warehouse/client.ts b/packages/appkit/src/connectors/sql-warehouse/client.ts index d0a1c1816..a0016d7bd 100644 --- a/packages/appkit/src/connectors/sql-warehouse/client.ts +++ b/packages/appkit/src/connectors/sql-warehouse/client.ts @@ -21,10 +21,23 @@ import { SpanStatusCode, TelemetryManager, } from "../../telemetry"; +import { buildEmptyArrowIPCBase64 } from "./arrow-schema"; import { executeStatementDefaults } from "./defaults"; const logger = createLogger("connectors:sql-warehouse"); +/** + * Maximum size for inline Arrow IPC attachments (25 MiB decoded — the + * Databricks Statement Execution API hard cap on INLINE responses). + * + * Bulk Arrow payloads no longer traverse SSE — the analytics route stashes + * them via `InlineArrowStash` and the client fetches over HTTP — so this + * cap is bounded by the upstream API rather than our event-size budget. + * Larger results still fall through to `disposition: "EXTERNAL_LINKS"`, + * handled by the analytics format-fallback. + */ +const MAX_INLINE_ATTACHMENT_BYTES = 25 * 1024 * 1024; + interface SQLWarehouseConfig { timeout?: number; telemetry?: TelemetryOptions; @@ -196,7 +209,10 @@ export class SQLWarehouseConnector { result = this._transformDataArray(response); break; case "FAILED": - throw ExecutionError.statementFailed(status.error?.message); + throw ExecutionError.statementFailed( + status.error?.message, + status.error?.error_code, + ); case "CANCELED": throw ExecutionError.canceled(); case "CLOSED": @@ -246,8 +262,17 @@ export class SQLWarehouseConnector { if (error instanceof AppKitError) { throw error; } + // Preserve the SDK's structured ApiError.errorCode (e.g. + // "INVALID_PARAMETER_VALUE", "BAD_REQUEST") through the wrap so + // callers can branch on a stable identifier rather than + // substring-matching the message. + const sdkErrorCode = + error && typeof error === "object" && "errorCode" in error + ? (error as { errorCode?: unknown }).errorCode + : undefined; throw ExecutionError.statementFailed( error instanceof Error ? error.message : String(error), + typeof sdkErrorCode === "string" ? sdkErrorCode : undefined, ); } finally { // remove abort handler @@ -360,7 +385,10 @@ export class SQLWarehouseConnector { span.setStatus({ code: SpanStatusCode.OK }); return this._transformDataArray(response); case "FAILED": - throw ExecutionError.statementFailed(status.error?.message); + throw ExecutionError.statementFailed( + status.error?.message, + status.error?.error_code, + ); case "CANCELED": throw ExecutionError.canceled(); case "CLOSED": @@ -382,12 +410,16 @@ export class SQLWarehouseConnector { message: error instanceof Error ? error.message : String(error), }); - // error logging is handled by executeStatement's catch block (gated on isAborted) if (error instanceof AppKitError) { throw error; } + const sdkErrorCode = + error && typeof error === "object" && "errorCode" in error + ? (error as { errorCode?: unknown }).errorCode + : undefined; throw ExecutionError.statementFailed( error instanceof Error ? error.message : String(error), + typeof sdkErrorCode === "string" ? sdkErrorCode : undefined, ); } finally { span.end(); @@ -399,7 +431,47 @@ export class SQLWarehouseConnector { private _transformDataArray(response: sql.StatementResponse) { if (response.manifest?.format === "ARROW_STREAM") { - return this.updateWithArrowStatus(response); + const result = response.result as + | (sql.ResultData & { attachment?: string }) + | undefined; + + // Inline Arrow: pass the base64 IPC attachment through unmodified so + // the analytics route can stream it to the client, where the existing + // ArrowClient infrastructure decodes it into a Table. Validate size + // here to fail fast on runaway payloads. + if (result?.attachment) { + return this._validateArrowAttachment(response, result.attachment); + } + + // External links: data fetched separately via statement_id. + if (result?.external_links) { + return this.updateWithArrowStatus(response); + } + + // Empty result with a known schema: synthesize a zero-row Arrow IPC + // attachment so the client always receives an Arrow Table for + // ARROW_STREAM, regardless of whether the warehouse returned data. + // Note: an empty array (`data_array: []`) is truthy, so length-check + // explicitly — otherwise zero-row responses fall through to the JSON + // row transform below and return `[]` JSON rows instead of an Arrow + // table. + const hasNoRows = + !result?.data_array || + (Array.isArray(result.data_array) && result.data_array.length === 0); + if (hasNoRows && response.manifest?.schema?.columns) { + const synthesized = buildEmptyArrowIPCBase64( + response.manifest.schema.columns, + ); + return { + ...response, + result: { ...(result ?? {}), attachment: synthesized }, + }; + } + + // Inline data_array under ARROW_STREAM (rare): fall through to the + // row transform below. The hook will receive `type: "result"` rows; + // callers asking for ARROW_STREAM should not hit this path with + // current Databricks warehouses. } if (!response.result?.data_array || !response.manifest?.schema?.columns) { @@ -445,6 +517,41 @@ export class SQLWarehouseConnector { }; } + /** + * Validate (but do not decode) a base64 Arrow IPC attachment. + * Some serverless warehouses return inline results as Arrow IPC in + * `result.attachment`. We pass the base64 string through to the client, + * which decodes it into an Arrow Table via the existing ArrowClient + * infrastructure. This keeps the wire contract for ARROW_STREAM + * consistent (client always receives an Arrow Table) and avoids + * decode/re-encode work on the server. + */ + private _validateArrowAttachment( + response: sql.StatementResponse, + attachment: string, + ) { + // Cap the size to protect against unbounded inline payloads from + // misbehaving warehouses. 64 MiB is well above the typical inline limit + // (~25 MiB hard cap on the API) but bounds memory if a server returns + // a runaway response. + // + // Strip whitespace (rare but legal in base64) and account for trailing + // `=` padding so the byte count is exact rather than an upper bound. + const stripped = attachment.replace(/\s+/g, ""); + const padding = stripped.endsWith("==") + ? 2 + : stripped.endsWith("=") + ? 1 + : 0; + const decodedSize = Math.floor((stripped.length * 3) / 4) - padding; + if (decodedSize > MAX_INLINE_ATTACHMENT_BYTES) { + throw ExecutionError.statementFailed( + `Inline Arrow attachment exceeds maximum size (${decodedSize} > ${MAX_INLINE_ATTACHMENT_BYTES} bytes)`, + ); + } + return response; + } + private updateWithArrowStatus(response: sql.StatementResponse): { result: { statement_id: string; status: sql.StatementStatus }; } { diff --git a/packages/appkit/src/connectors/sql-warehouse/tests/arrow-schema.test.ts b/packages/appkit/src/connectors/sql-warehouse/tests/arrow-schema.test.ts new file mode 100644 index 000000000..e30b7315a --- /dev/null +++ b/packages/appkit/src/connectors/sql-warehouse/tests/arrow-schema.test.ts @@ -0,0 +1,514 @@ +import { + Binary, + Bool, + type DataType, + DateDay, + Decimal, + DurationMicrosecond, + Float32, + Float64, + Int8, + Int16, + Int32, + Int64, + IntervalYearMonth, + List, + Map_, + Null, + Struct, + TimestampMicrosecond, + Type, + tableFromIPC, + Utf8, +} from "apache-arrow"; +import { describe, expect, test } from "vitest"; +import { buildEmptyArrowIPCBase64, parseDatabricksType } from "../arrow-schema"; + +// ============================================================================ +// Helpers +// ============================================================================ + +/** Walk the type tree and produce a stable string representation for assertions. */ +function typeSummary(t: DataType): string { + if (t instanceof Decimal) return `Decimal(${t.precision},${t.scale})`; + if (t instanceof TimestampMicrosecond) { + const tz = (t as TimestampMicrosecond & { timezone?: string }).timezone; + return tz ? `Timestamp[us,${tz}]` : "Timestamp[us]"; + } + if (t instanceof List) { + const inner = (t.children?.[0]?.type as DataType | undefined) ?? new Utf8(); + return `List<${typeSummary(inner)}>`; + } + if (t instanceof Struct) { + const inner = (t.children ?? []) + .map((f) => `${f.name}:${typeSummary(f.type as DataType)}`) + .join(","); + return `Struct<${inner}>`; + } + if (t instanceof Map_) { + const entries = + (t.children?.[0]?.type as Struct | undefined)?.children ?? []; + const k = entries[0]?.type as DataType | undefined; + const v = entries[1]?.type as DataType | undefined; + return `Map<${typeSummary(k ?? new Utf8())},${typeSummary(v ?? new Utf8())}>`; + } + // Fall back to typeId for primitives. + return Type[t.typeId] ?? t.constructor.name; +} + +// ============================================================================ +// Scalar types +// ============================================================================ + +describe("parseDatabricksType — scalars", () => { + test.each([ + ["STRING", Utf8], + ["VARIANT", Utf8], + ["BINARY", Binary], + ["GEOGRAPHY", Binary], + ["GEOMETRY", Binary], + ["BOOLEAN", Bool], + ["BOOL", Bool], + ["TINYINT", Int8], + ["BYTE", Int8], + ["SMALLINT", Int16], + ["SHORT", Int16], + ["INT", Int32], + ["INTEGER", Int32], + ["BIGINT", Int64], + ["LONG", Int64], + ["FLOAT", Float32], + ["REAL", Float32], + ["DOUBLE", Float64], + ["DATE", DateDay], + ["VOID", Null], + ["NULL", Null], + ] as const)("%s parses to expected type", (input, ctor) => { + const t = parseDatabricksType(input); + expect(t).toBeInstanceOf(ctor); + }); + + test("case-insensitive — lowercase is accepted", () => { + expect(parseDatabricksType("string")).toBeInstanceOf(Utf8); + expect(parseDatabricksType("bigint")).toBeInstanceOf(Int64); + }); + + test("TIMESTAMP defaults to UTC tz", () => { + const t = parseDatabricksType("TIMESTAMP") as TimestampMicrosecond; + expect(t).toBeInstanceOf(TimestampMicrosecond); + expect(t.timezone).toBe("UTC"); + }); + + test("TIMESTAMP_LTZ behaves like TIMESTAMP", () => { + const t = parseDatabricksType("TIMESTAMP_LTZ") as TimestampMicrosecond; + expect(t.timezone).toBe("UTC"); + }); + + test("TIMESTAMP_NTZ has no timezone", () => { + const t = parseDatabricksType("TIMESTAMP_NTZ") as TimestampMicrosecond; + expect(t).toBeInstanceOf(TimestampMicrosecond); + expect(t.timezone == null || t.timezone === "").toBe(true); + }); + + test("Unknown scalar falls back to Utf8 (degraded but doesn't throw)", () => { + expect(parseDatabricksType("SOMETHING_NEW")).toBeInstanceOf(Utf8); + }); +}); + +// ============================================================================ +// Parameterized scalars +// ============================================================================ + +describe("parseDatabricksType — parameterized scalars", () => { + test("VARCHAR(255) → Utf8 (Arrow doesn't track string length)", () => { + expect(parseDatabricksType("VARCHAR(255)")).toBeInstanceOf(Utf8); + }); + + test("CHAR(10) → Utf8", () => { + expect(parseDatabricksType("CHAR(10)")).toBeInstanceOf(Utf8); + }); + + test("DECIMAL(10,2) → Decimal(precision=10, scale=2)", () => { + const t = parseDatabricksType("DECIMAL(10,2)") as Decimal; + expect(t).toBeInstanceOf(Decimal); + expect(t.precision).toBe(10); + expect(t.scale).toBe(2); + }); + + test("DECIMAL(38,0) — max precision, no scale", () => { + const t = parseDatabricksType("DECIMAL(38,0)") as Decimal; + expect(t.precision).toBe(38); + expect(t.scale).toBe(0); + }); + + test("NUMERIC(p,s) is an alias for DECIMAL(p,s)", () => { + const t = parseDatabricksType("NUMERIC(15,4)") as Decimal; + expect(t).toBeInstanceOf(Decimal); + expect(t.precision).toBe(15); + expect(t.scale).toBe(4); + }); + + test("DEC(p,s) is an alias for DECIMAL(p,s)", () => { + const t = parseDatabricksType("DEC(7,3)") as Decimal; + expect(t.precision).toBe(7); + expect(t.scale).toBe(3); + }); + + test("DECIMAL with whitespace inside parens", () => { + const t = parseDatabricksType("DECIMAL( 10 , 2 )") as Decimal; + expect(t.precision).toBe(10); + expect(t.scale).toBe(2); + }); + + test("DECIMAL with single arg (precision only) defaults scale=0", () => { + const t = parseDatabricksType("DECIMAL(20)") as Decimal; + expect(t.precision).toBe(20); + expect(t.scale).toBe(0); + }); + + test("Bare DECIMAL falls back to default precision/scale", () => { + const t = parseDatabricksType("DECIMAL") as Decimal; + expect(t).toBeInstanceOf(Decimal); + expect(typeof t.precision).toBe("number"); + expect(typeof t.scale).toBe("number"); + }); +}); + +// ============================================================================ +// INTERVAL types +// ============================================================================ + +describe("parseDatabricksType — INTERVAL", () => { + test("INTERVAL YEAR → IntervalYearMonth", () => { + expect(parseDatabricksType("INTERVAL YEAR")).toBeInstanceOf( + IntervalYearMonth, + ); + }); + + test("INTERVAL MONTH → IntervalYearMonth", () => { + expect(parseDatabricksType("INTERVAL MONTH")).toBeInstanceOf( + IntervalYearMonth, + ); + }); + + test("INTERVAL YEAR TO MONTH → IntervalYearMonth", () => { + expect(parseDatabricksType("INTERVAL YEAR TO MONTH")).toBeInstanceOf( + IntervalYearMonth, + ); + }); + + test("INTERVAL DAY → DurationMicrosecond", () => { + expect(parseDatabricksType("INTERVAL DAY")).toBeInstanceOf( + DurationMicrosecond, + ); + }); + + test("INTERVAL DAY TO SECOND → DurationMicrosecond", () => { + expect(parseDatabricksType("INTERVAL DAY TO SECOND")).toBeInstanceOf( + DurationMicrosecond, + ); + }); + + test("INTERVAL HOUR TO MINUTE → DurationMicrosecond", () => { + expect(parseDatabricksType("INTERVAL HOUR TO MINUTE")).toBeInstanceOf( + DurationMicrosecond, + ); + }); +}); + +// ============================================================================ +// ARRAY +// ============================================================================ + +describe("parseDatabricksType — ARRAY", () => { + test("ARRAY → List", () => { + const t = parseDatabricksType("ARRAY") as List; + expect(t).toBeInstanceOf(List); + expect(t.children?.[0]?.type).toBeInstanceOf(Utf8); + }); + + test("ARRAY → List", () => { + const t = parseDatabricksType("ARRAY") as List; + expect(t.children?.[0]?.type).toBeInstanceOf(Int32); + }); + + test("ARRAY preserves precision/scale", () => { + const t = parseDatabricksType("ARRAY") as List; + const inner = t.children?.[0]?.type as Decimal; + expect(inner).toBeInstanceOf(Decimal); + expect(inner.precision).toBe(10); + expect(inner.scale).toBe(2); + }); + + test("ARRAY> — nested twice", () => { + const t = parseDatabricksType("ARRAY>") as List; + const inner1 = t.children?.[0]?.type as List; + expect(inner1).toBeInstanceOf(List); + expect(inner1.children?.[0]?.type).toBeInstanceOf(Int32); + }); + + test("ARRAY>> — three levels deep", () => { + expect( + typeSummary(parseDatabricksType("ARRAY>>")), + ).toBe("List>>"); + }); + + test("ARRAY with whitespace", () => { + const t = parseDatabricksType("ARRAY < STRING >") as List; + expect(t.children?.[0]?.type).toBeInstanceOf(Utf8); + }); +}); + +// ============================================================================ +// MAP +// ============================================================================ + +describe("parseDatabricksType — MAP", () => { + test("MAP", () => { + expect(typeSummary(parseDatabricksType("MAP"))).toBe( + "Map", + ); + }); + + test("MAP — with whitespace", () => { + expect(typeSummary(parseDatabricksType("MAP"))).toBe( + "Map", + ); + }); + + test("MAP> — value is nested", () => { + expect(typeSummary(parseDatabricksType("MAP>"))).toBe( + "Map>", + ); + }); + + test("MAP> — fully nested", () => { + expect( + typeSummary(parseDatabricksType("MAP>")), + ).toBe("Map>"); + }); +}); + +// ============================================================================ +// STRUCT +// ============================================================================ + +describe("parseDatabricksType — STRUCT", () => { + test("STRUCT", () => { + const t = parseDatabricksType("STRUCT") as Struct; + expect(t).toBeInstanceOf(Struct); + expect(t.children?.length).toBe(2); + expect(t.children?.[0]?.name).toBe("a"); + expect(t.children?.[0]?.type).toBeInstanceOf(Int32); + expect(t.children?.[1]?.name).toBe("b"); + expect(t.children?.[1]?.type).toBeInstanceOf(Utf8); + }); + + test("STRUCT with whitespace and many fields", () => { + const t = parseDatabricksType( + "STRUCT", + ) as Struct; + expect(t.children?.map((f) => f.name)).toEqual(["id", "name", "ts"]); + expect(t.children?.[0]?.type).toBeInstanceOf(Int64); + expect(t.children?.[2]?.type).toBeInstanceOf(TimestampMicrosecond); + }); + + test("STRUCT with COMMENT on a field", () => { + const t = parseDatabricksType( + "STRUCT", + ) as Struct; + expect(t.children?.length).toBe(2); + expect(t.children?.[0]?.name).toBe("id"); + expect(t.children?.[0]?.type).toBeInstanceOf(Int32); + expect(t.children?.[1]?.name).toBe("name"); + }); + + test("STRUCT with COMMENT containing escaped quote", () => { + const t = parseDatabricksType( + "STRUCT", + ) as Struct; + expect(t.children?.length).toBe(2); + expect(t.children?.[0]?.name).toBe("id"); + }); + + test("STRUCT with NOT NULL annotation on a field", () => { + const t = parseDatabricksType( + "STRUCT", + ) as Struct; + expect(t.children?.length).toBe(2); + expect(t.children?.[0]?.name).toBe("id"); + }); + + test("STRUCT with backticked field name", () => { + const t = parseDatabricksType( + "STRUCT<`weird name`:INT, normal:STRING>", + ) as Struct; + expect(t.children?.[0]?.name).toBe("weird name"); + expect(t.children?.[0]?.type).toBeInstanceOf(Int32); + }); + + test("STRUCT with backticked field name containing escaped backtick", () => { + const t = parseDatabricksType( + "STRUCT<`with``tick`:INT, other:STRING>", + ) as Struct; + expect(t.children?.[0]?.name).toBe("with`tick"); + }); + + test("STRUCT with nested STRUCT", () => { + const t = parseDatabricksType( + "STRUCT, name:STRING>", + ) as Struct; + expect(t.children?.length).toBe(2); + const nested = t.children?.[0]?.type as Struct; + expect(nested).toBeInstanceOf(Struct); + expect(nested.children?.[0]?.name).toBe("inner"); + expect(nested.children?.[0]?.type).toBeInstanceOf(Int32); + }); + + test("Empty STRUCT<>", () => { + const t = parseDatabricksType("STRUCT<>") as Struct; + expect(t).toBeInstanceOf(Struct); + expect(t.children?.length).toBe(0); + }); +}); + +// ============================================================================ +// Deep nesting / mixed types +// ============================================================================ + +describe("parseDatabricksType — deeply nested", () => { + test("MAP>>", () => { + expect( + typeSummary( + parseDatabricksType( + "MAP>>", + ), + ), + ).toBe("Map>>"); + }); + + test("ARRAY>>> — 4 levels mixed", () => { + expect( + typeSummary( + parseDatabricksType( + "ARRAY>>>", + ), + ), + ).toBe("List>>>"); + }); +}); + +// ============================================================================ +// Error / robustness behavior +// ============================================================================ + +describe("parseDatabricksType — error / robustness", () => { + test("trailing garbage throws", () => { + expect(() => parseDatabricksType("INT junk")).toThrow(); + }); + + test("unmatched < throws", () => { + expect(() => parseDatabricksType("ARRAY { + expect(() => parseDatabricksType("DECIMAL(10,2")).toThrow(); + }); + + test("empty string throws", () => { + expect(() => parseDatabricksType("")).toThrow(); + }); +}); + +// ============================================================================ +// buildEmptyArrowIPCBase64 — round-trip +// ============================================================================ + +describe("buildEmptyArrowIPCBase64", () => { + test("produces a decodable empty Arrow Table with the right schema", () => { + const columns = [ + { name: "user_id", type_text: "BIGINT" }, + { name: "name", type_text: "STRING" }, + { name: "created_at", type_text: "TIMESTAMP" }, + { name: "balance", type_text: "DECIMAL(10,2)" }, + { name: "active", type_text: "BOOLEAN" }, + ]; + const b64 = buildEmptyArrowIPCBase64(columns); + const buf = Buffer.from(b64, "base64"); + const table = tableFromIPC(buf); + expect(table.numRows).toBe(0); + expect(table.numCols).toBe(5); + expect(table.schema.fields.map((f) => f.name)).toEqual([ + "user_id", + "name", + "created_at", + "balance", + "active", + ]); + expect( + (table.schema.fields[0]?.type as { bitWidth?: number }).bitWidth, + ).toBe(64); + expect(table.schema.fields[1]?.type).toBeInstanceOf(Utf8); + // After IPC round-trip Arrow JS resolves Timestamp* subclasses to a + // generic Timestamp with `unit` and `timezone`; assert structurally. + expect(table.schema.fields[2]?.type.typeId).toBe(Type.Timestamp); + expect((table.schema.fields[2]?.type as { unit?: number }).unit).toBe(2); // TimeUnit.MICROSECOND + const decimal = table.schema.fields[3]?.type as Decimal; + expect(decimal).toBeInstanceOf(Decimal); + expect(decimal.precision).toBe(10); + expect(decimal.scale).toBe(2); + expect(table.schema.fields[4]?.type).toBeInstanceOf(Bool); + }); + + test("round-trips nested types end-to-end", () => { + const columns = [ + { name: "tags", type_text: "ARRAY" }, + { name: "meta", type_text: "STRUCT" }, + { name: "counts", type_text: "MAP" }, + ]; + const buf = Buffer.from(buildEmptyArrowIPCBase64(columns), "base64"); + const table = tableFromIPC(buf); + expect(table.numRows).toBe(0); + expect(table.numCols).toBe(3); + expect(table.schema.fields[0]?.type).toBeInstanceOf(List); + expect(table.schema.fields[1]?.type).toBeInstanceOf(Struct); + expect(table.schema.fields[2]?.type).toBeInstanceOf(Map_); + }); + + test("falls back from type_text to type_name when type_text missing", () => { + const columns = [{ name: "id", type_name: "BIGINT" }]; + const buf = Buffer.from(buildEmptyArrowIPCBase64(columns), "base64"); + const table = tableFromIPC(buf); + expect( + (table.schema.fields[0]?.type as { bitWidth?: number }).bitWidth, + ).toBe(64); + }); + + test("unknown type degrades to Utf8 without throwing", () => { + const columns = [ + { name: "id", type_text: "BIGINT" }, + { name: "weird", type_text: "FUTURE_TYPE_NOT_YET_SUPPORTED" }, + ]; + const buf = Buffer.from(buildEmptyArrowIPCBase64(columns), "base64"); + const table = tableFromIPC(buf); + expect( + (table.schema.fields[0]?.type as { bitWidth?: number }).bitWidth, + ).toBe(64); + expect(table.schema.fields[1]?.type).toBeInstanceOf(Utf8); + }); + + test("missing column name gets a synthesized placeholder", () => { + const columns = [{ type_text: "STRING" }, { name: "", type_text: "INT" }]; + const buf = Buffer.from(buildEmptyArrowIPCBase64(columns), "base64"); + const table = tableFromIPC(buf); + expect(table.schema.fields[0]?.name).toBe("column_0"); + expect(table.schema.fields[1]?.name).toBe("column_1"); + }); + + test("empty schema produces a valid 0-column 0-row Table", () => { + const buf = Buffer.from(buildEmptyArrowIPCBase64([]), "base64"); + const table = tableFromIPC(buf); + expect(table.numRows).toBe(0); + expect(table.numCols).toBe(0); + }); +}); diff --git a/packages/appkit/src/connectors/sql-warehouse/tests/client.test.ts b/packages/appkit/src/connectors/sql-warehouse/tests/client.test.ts new file mode 100644 index 000000000..c6780f3f1 --- /dev/null +++ b/packages/appkit/src/connectors/sql-warehouse/tests/client.test.ts @@ -0,0 +1,383 @@ +import type { sql } from "@databricks/sdk-experimental"; +import { tableFromIPC } from "apache-arrow"; +import { describe, expect, test, vi } from "vitest"; + +vi.mock("../../../telemetry", () => { + const mockMeter = { + createCounter: () => ({ add: vi.fn() }), + createHistogram: () => ({ record: vi.fn() }), + }; + return { + TelemetryManager: { + getProvider: () => ({ + startActiveSpan: vi.fn(), + getMeter: () => mockMeter, + }), + }, + SpanKind: { CLIENT: 1 }, + SpanStatusCode: { ERROR: 2 }, + }; +}); +vi.mock("../../../logging/logger", () => ({ + createLogger: () => ({ + info: vi.fn(), + debug: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + event: () => null, + }), +})); +vi.mock("../../../stream/arrow-stream-processor", () => ({ + ArrowStreamProcessor: vi.fn(), +})); + +import { SQLWarehouseConnector } from "../client"; + +function createConnector() { + return new SQLWarehouseConnector({ timeout: 30000 }); +} + +// Real base64 Arrow IPC from a serverless warehouse returning +// `SELECT 1 AS test_col, 2 AS test_col2` with INLINE + ARROW_STREAM. +// Contains schema (two INT columns) + one record batch with values [1, 2]. +const REAL_ARROW_ATTACHMENT = + "/////7gAAAAQAAAAAAAKAAwACgAJAAQACgAAABAAAAAAAQQACAAIAAAABAAIAAAABAAAAAIAAABMAAAABAAAAMz///8QAAAAGAAAAAAAAQIUAAAAvP///yAAAAAAAAABAAAAAAkAAAB0ZXN0X2NvbDIAAAAQABQAEAAOAA8ABAAAAAgAEAAAABgAAAAgAAAAAAABAhwAAAAIAAwABAALAAgAAAAgAAAAAAAAAQAAAAAIAAAAdGVzdF9jb2wAAAAA/////7gAAAAQAAAADAAaABgAFwAEAAgADAAAACAAAAAAAQAAAAAAAAAAAAAAAAADBAAKABgADAAIAAQACgAAADwAAAAQAAAAAQAAAAAAAAAAAAAAAgAAAAEAAAAAAAAAAAAAAAAAAAABAAAAAAAAAAAAAAAAAAAAAAAAAAQAAAAAAAAAAAAAAAEAAAAAAAAAQAAAAAAAAAAEAAAAAAAAAIAAAAAAAAAAAQAAAAAAAADAAAAAAAAAAAQAAAAAAAAA/wAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAEAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAD/AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAgAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAP////8AAAAA"; + +describe("SQLWarehouseConnector._transformDataArray", () => { + describe("classic warehouse (JSON_ARRAY + INLINE)", () => { + test("transforms data_array rows into named objects", () => { + const connector = createConnector(); + // Real response shape from classic warehouse: INLINE + JSON_ARRAY + const response = { + statement_id: "stmt-1", + status: { state: "SUCCEEDED" }, + manifest: { + format: "JSON_ARRAY", + schema: { + column_count: 2, + columns: [ + { + name: "test_col", + type_text: "INT", + type_name: "INT", + position: 0, + }, + { + name: "test_col2", + type_text: "INT", + type_name: "INT", + position: 1, + }, + ], + }, + total_row_count: 1, + truncated: false, + }, + result: { + data_array: [["1", "2"]], + }, + } as unknown as sql.StatementResponse; + + const result = (connector as any)._transformDataArray(response); + expect(result.result.data).toEqual([{ test_col: "1", test_col2: "2" }]); + expect(result.result.data_array).toBeUndefined(); + }); + + test("parses JSON strings in STRING columns", () => { + const connector = createConnector(); + const response = { + statement_id: "stmt-1", + status: { state: "SUCCEEDED" }, + manifest: { + format: "JSON_ARRAY", + schema: { + columns: [ + { name: "id", type_name: "INT" }, + { name: "metadata", type_name: "STRING" }, + ], + }, + }, + result: { + data_array: [["1", '{"key":"value"}']], + }, + } as unknown as sql.StatementResponse; + + const result = (connector as any)._transformDataArray(response); + expect(result.result.data[0].metadata).toEqual({ key: "value" }); + }); + }); + + describe("classic warehouse (EXTERNAL_LINKS + ARROW_STREAM)", () => { + test("returns statement_id for external links fetch", () => { + const connector = createConnector(); + // Real response shape from classic warehouse: EXTERNAL_LINKS + ARROW_STREAM + const response = { + statement_id: "stmt-1", + status: { state: "SUCCEEDED" }, + manifest: { + format: "ARROW_STREAM", + schema: { + columns: [ + { name: "test_col", type_name: "INT" }, + { name: "test_col2", type_name: "INT" }, + ], + }, + }, + result: { + external_links: [ + { + external_link: "https://storage.example.com/chunk0", + expiration: "2026-04-15T00:00:00Z", + }, + ], + }, + } as unknown as sql.StatementResponse; + + const result = (connector as any)._transformDataArray(response); + expect(result.result.statement_id).toBe("stmt-1"); + expect(result.result.data).toBeUndefined(); + }); + }); + + describe("serverless warehouse (INLINE + ARROW_STREAM with attachment)", () => { + test("passes attachment through unchanged for client-side decoding", () => { + const connector = createConnector(); + // Real response shape from serverless warehouse: INLINE + ARROW_STREAM + // Data arrives in result.attachment as base64-encoded Arrow IPC, not data_array. + const response = { + statement_id: "00000001-test-stmt", + status: { state: "SUCCEEDED" }, + manifest: { + format: "ARROW_STREAM", + schema: { + column_count: 2, + columns: [ + { + name: "test_col", + type_text: "INT", + type_name: "INT", + position: 0, + }, + { + name: "test_col2", + type_text: "INT", + type_name: "INT", + position: 1, + }, + ], + total_chunk_count: 1, + chunks: [{ chunk_index: 0, row_offset: 0, row_count: 1 }], + total_row_count: 1, + }, + truncated: false, + }, + result: { + chunk_index: 0, + row_offset: 0, + row_count: 1, + attachment: REAL_ARROW_ATTACHMENT, + }, + } as unknown as sql.StatementResponse; + + const result = (connector as any)._transformDataArray(response); + expect(result.result.attachment).toBe(REAL_ARROW_ATTACHMENT); + expect(result.result.data).toBeUndefined(); + // Preserves other result fields + expect(result.result.row_count).toBe(1); + }); + + test("preserves manifest and status alongside attachment", () => { + const connector = createConnector(); + const response = { + statement_id: "00000001-test-stmt", + status: { state: "SUCCEEDED" }, + manifest: { + format: "ARROW_STREAM", + schema: { + columns: [ + { name: "test_col", type_name: "INT" }, + { name: "test_col2", type_name: "INT" }, + ], + }, + }, + result: { + chunk_index: 0, + row_count: 1, + attachment: REAL_ARROW_ATTACHMENT, + }, + } as unknown as sql.StatementResponse; + + const result = (connector as any)._transformDataArray(response); + // Manifest, statement_id, and attachment are all preserved + expect(result.manifest.format).toBe("ARROW_STREAM"); + expect(result.statement_id).toBe("00000001-test-stmt"); + expect(result.result.attachment).toBe(REAL_ARROW_ATTACHMENT); + }); + + test("synthesizes an empty Arrow IPC attachment for empty results so the client always gets a Table", () => { + const connector = createConnector(); + // Empty result: no attachment, no data_array, no external_links — but + // the manifest still describes the schema. The connector should fill in + // `attachment` with a zero-row Arrow IPC matching the schema. + const response = { + statement_id: "stmt-empty", + status: { state: "SUCCEEDED" }, + manifest: { + format: "ARROW_STREAM", + schema: { + columns: [ + { name: "user_id", type_text: "BIGINT", type_name: "BIGINT" }, + { name: "name", type_text: "STRING", type_name: "STRING" }, + { + name: "balance", + type_text: "DECIMAL(10,2)", + type_name: "DECIMAL", + }, + ], + }, + total_row_count: 0, + }, + result: {}, + } as unknown as sql.StatementResponse; + + const transformed = (connector as any)._transformDataArray(response); + const attachment: string = transformed.result.attachment; + expect(typeof attachment).toBe("string"); + expect(attachment.length).toBeGreaterThan(0); + + // Verify the synthesized attachment decodes into the right empty schema. + const table = tableFromIPC(Buffer.from(attachment, "base64")); + expect(table.numRows).toBe(0); + expect(table.schema.fields.map((f) => f.name)).toEqual([ + "user_id", + "name", + "balance", + ]); + }); + + test("does NOT synthesize an attachment when external_links are present", () => { + const connector = createConnector(); + const response = { + statement_id: "stmt-ext", + status: { state: "SUCCEEDED" }, + manifest: { + format: "ARROW_STREAM", + schema: { columns: [{ name: "x", type_text: "INT" }] }, + }, + result: { + external_links: [ + { external_link: "https://example.com/x", expiration: "9999" }, + ], + }, + } as unknown as sql.StatementResponse; + + const transformed = (connector as any)._transformDataArray(response); + // External-links path returns the statement_id projection — no attachment. + expect(transformed.result.attachment).toBeUndefined(); + expect(transformed.result.statement_id).toBe("stmt-ext"); + }); + + test("does NOT synthesize an attachment when schema is missing", () => { + const connector = createConnector(); + const response = { + statement_id: "stmt-no-schema", + status: { state: "SUCCEEDED" }, + manifest: { format: "ARROW_STREAM" }, + result: {}, + } as unknown as sql.StatementResponse; + + const transformed = (connector as any)._transformDataArray(response); + // Without a schema we cannot build a Table — pass through unchanged. + expect(transformed.result?.attachment).toBeUndefined(); + }); + + test("rejects oversized attachments to bound memory", () => { + const connector = createConnector(); + // 25 MiB decoded cap (Databricks API hard cap on INLINE) → 36 MiB of + // base64 chars decodes to ~27 MiB, comfortably above the limit. + const oversized = "A".repeat(36 * 1024 * 1024); + const response = { + statement_id: "stmt-oversized", + status: { state: "SUCCEEDED" }, + manifest: { format: "ARROW_STREAM" }, + result: { attachment: oversized }, + } as unknown as sql.StatementResponse; + + expect(() => (connector as any)._transformDataArray(response)).toThrow( + /exceeds maximum size/, + ); + }); + }); + + describe("ARROW_STREAM with data_array (hypothetical inline variant)", () => { + test("transforms data_array like JSON_ARRAY path", () => { + const connector = createConnector(); + const response = { + statement_id: "stmt-1", + status: { state: "SUCCEEDED" }, + manifest: { + format: "ARROW_STREAM", + schema: { + columns: [ + { name: "id", type_name: "INT" }, + { name: "value", type_name: "STRING" }, + ], + }, + }, + result: { + data_array: [ + ["1", "hello"], + ["2", "world"], + ], + }, + } as unknown as sql.StatementResponse; + + const result = (connector as any)._transformDataArray(response); + expect(result.result.data).toEqual([ + { id: "1", value: "hello" }, + { id: "2", value: "world" }, + ]); + }); + }); + + describe("edge cases", () => { + test("returns response unchanged when no data_array, attachment, or schema", () => { + const connector = createConnector(); + const response = { + statement_id: "stmt-1", + status: { state: "SUCCEEDED" }, + manifest: { format: "JSON_ARRAY" }, + result: {}, + } as unknown as sql.StatementResponse; + + const result = (connector as any)._transformDataArray(response); + expect(result).toBe(response); + }); + + test("attachment takes priority over data_array when both present", () => { + const connector = createConnector(); + const response = { + statement_id: "stmt-1", + status: { state: "SUCCEEDED" }, + manifest: { + format: "ARROW_STREAM", + schema: { + columns: [ + { name: "test_col", type_name: "INT" }, + { name: "test_col2", type_name: "INT" }, + ], + }, + }, + result: { + attachment: REAL_ARROW_ATTACHMENT, + data_array: [["999", "999"]], + }, + } as unknown as sql.StatementResponse; + + const result = (connector as any)._transformDataArray(response); + // Should pass attachment through (client decodes), not transform data_array + expect(result.result.attachment).toBe(REAL_ARROW_ATTACHMENT); + expect(result.result.data).toBeUndefined(); + }); + }); +}); diff --git a/packages/appkit/src/errors/execution.ts b/packages/appkit/src/errors/execution.ts index 42de77043..1e6d1f5f6 100644 --- a/packages/appkit/src/errors/execution.ts +++ b/packages/appkit/src/errors/execution.ts @@ -16,13 +16,39 @@ export class ExecutionError extends AppKitError { readonly isRetryable = false; /** - * Create an execution error for statement failure + * Structured error code from the upstream source (typically the warehouse's + * `error_code` for statement-level failures, or the SDK's `ApiError.errorCode` + * for HTTP failures). Preserved through wrapping so callers can branch on a + * stable identifier without substring-matching the message. */ - static statementFailed(errorMessage?: string): ExecutionError { + readonly errorCode?: string; + + constructor( + message: string, + options?: { + cause?: Error; + context?: Record; + errorCode?: string; + }, + ) { + super(message, options); + this.errorCode = options?.errorCode; + } + + /** + * Create an execution error for statement failure. + * @param errorMessage Human-readable error from the warehouse / SDK. + * @param errorCode Structured code (e.g. "INVALID_PARAMETER_VALUE") to + * preserve through wrapping. Optional. + */ + static statementFailed( + errorMessage?: string, + errorCode?: string, + ): ExecutionError { const message = errorMessage ? `Statement failed: ${errorMessage}` : "Statement failed: Unknown error"; - return new ExecutionError(message); + return new ExecutionError(message, { errorCode }); } /** diff --git a/packages/appkit/src/plugins/agents/agents.ts b/packages/appkit/src/plugins/agents/agents.ts index 3c20d6165..87a46d34f 100644 --- a/packages/appkit/src/plugins/agents/agents.ts +++ b/packages/appkit/src/plugins/agents/agents.ts @@ -4,7 +4,6 @@ import type express from "express"; import pc from "picocolors"; import type { AgentAdapter, - AgentEvent, AgentRunContext, AgentToolDefinition, IAppRouter, @@ -16,7 +15,6 @@ import type { ToolProvider, } from "shared"; import { AppKitMcpClient, buildMcpHostPolicy } from "../../connectors/mcp"; -import { getWorkspaceClient } from "../../context"; import { consumeAdapterStream } from "../../core/agent/consume-adapter-stream"; import { loadAgentsFromDir } from "../../core/agent/load-agents"; import { normalizeToolResult } from "../../core/agent/normalize-result"; diff --git a/packages/appkit/src/plugins/agents/tests/dos-limits.test.ts b/packages/appkit/src/plugins/agents/tests/dos-limits.test.ts index e2bbcbe99..a0c64e578 100644 --- a/packages/appkit/src/plugins/agents/tests/dos-limits.test.ts +++ b/packages/appkit/src/plugins/agents/tests/dos-limits.test.ts @@ -307,7 +307,7 @@ describe("runSubAgent — depth guard", () => { * so we can drive `runSubAgent` directly against the depth guard. */ function makeRunState( - plugin: AgentsPlugin, + _plugin: AgentsPlugin, overrides: Partial<{ maxToolCalls: number; maxSubAgentDepth: number; diff --git a/packages/appkit/src/plugins/analytics/analytics.ts b/packages/appkit/src/plugins/analytics/analytics.ts index 75537b357..d38077d8f 100644 --- a/packages/appkit/src/plugins/analytics/analytics.ts +++ b/packages/appkit/src/plugins/analytics/analytics.ts @@ -1,12 +1,15 @@ import type { WorkspaceClient } from "@databricks/sdk-experimental"; import type express from "express"; -import type { - AgentToolDefinition, - IAppRouter, - PluginExecuteConfig, - SQLTypeMarker, - StreamExecutionSettings, - ToolProvider, +import { + type AgentToolDefinition, + type AnalyticsSseMessage, + type IAppRouter, + makeArrowMessage, + makeResultMessage, + type PluginExecuteConfig, + type SQLTypeMarker, + type StreamExecutionSettings, + type ToolProvider, } from "shared"; import { z } from "zod"; import { SQLWarehouseConnector } from "../../connectors"; @@ -18,18 +21,21 @@ import { toolsFromRegistry, } from "../../core/agent/tools/define-tool"; import { assertReadOnlySql } from "../../core/agent/tools/sql-policy"; +import { ExecutionError } from "../../errors"; import { createLogger } from "../../logging/logger"; import { Plugin, toPlugin } from "../../plugin"; import type { PluginManifest } from "../../registry"; import { queryDefaults } from "./defaults"; +import { InlineArrowStash } from "./inline-arrow-stash"; import manifest from "./manifest.json"; import { QueryProcessor } from "./query"; -import type { - AnalyticsQueryResponse, - IAnalyticsConfig, - IAnalyticsQueryRequest, +import { + type AnalyticsFormat, + type AnalyticsQueryResponse, + type IAnalyticsConfig, + type IAnalyticsQueryRequest, + normalizeAnalyticsFormat, } from "./types"; -import { normalizeAnalyticsFormat } from "./types"; const logger = createLogger("analytics"); @@ -44,6 +50,17 @@ export class AnalyticsPlugin extends Plugin implements ToolProvider { private SQLClient: SQLWarehouseConnector; private queryProcessor: QueryProcessor; + /** + * Server-side stash for inline Arrow IPC payloads. + * + * INLINE ARROW_STREAM responses do not ride the SSE control channel — + * the route puts the decoded bytes here and emits an `arrow` SSE + * message with a synthetic `inline-` id, and the client fetches + * the bytes through the existing `/arrow-result/:jobId` endpoint with + * a real binary content-type. + */ + protected inlineArrowStash: InlineArrowStash = new InlineArrowStash(); + constructor(config: IAnalyticsConfig) { super(config); this.config = config; @@ -81,24 +98,60 @@ export class AnalyticsPlugin extends Plugin implements ToolProvider { /** * Handle Arrow data download requests. - * When called via asUser(req), uses the user's Databricks credentials. + * + * Two id shapes are supported: + * - `inline-`: bytes were stashed server-side by the query route. + * Drain the stash, serve directly with the canonical Arrow content + * type. No warehouse round-trip. + * - any other id: a warehouse-issued statement id. Fetch the Arrow + * stream from the warehouse via the SDK; serve the bytes. + * + * When called via asUser(req), uses the user's Databricks credentials + * for the warehouse path. The inline path is user-scoped at the stash + * layer instead. */ async _handleArrowRoute( req: express.Request, res: express.Response, ): Promise { + const { jobId } = req.params; + const event = logger.event(req); + event?.setComponent("analytics", "getArrowData").setContext("analytics", { + job_id: jobId, + plugin: this.name, + }); + + if (jobId.startsWith("inline-")) { + const userKey = this._stashUserKey(req); + const bytes = this.inlineArrowStash.take(jobId, userKey); + if (!bytes) { + // Already drained, expired, or never belonged to this user. 410 + // distinguishes this from "warehouse statement id not found" (404) + // so the client can surface a useful error. + logger.debug("Inline Arrow stash miss for jobId=%s", jobId); + res.status(410).json({ + error: "Inline Arrow result expired or unknown", + plugin: this.name, + }); + return; + } + logger.debug( + "Serving inline Arrow buffer: %d bytes for jobId=%s", + bytes.length, + jobId, + ); + res.setHeader("Content-Type", "application/vnd.apache.arrow.stream"); + res.setHeader("Content-Length", bytes.length.toString()); + // Inline payloads are single-use and short-lived; no public caching. + res.setHeader("Cache-Control", "no-store"); + res.send(Buffer.from(bytes.buffer, bytes.byteOffset, bytes.byteLength)); + return; + } + try { - const { jobId } = req.params; const workspaceClient = getWorkspaceClient(); - logger.debug("Processing Arrow job request for jobId=%s", jobId); - const event = logger.event(req); - event?.setComponent("analytics", "getArrowData").setContext("analytics", { - job_id: jobId, - plugin: this.name, - }); - const result = await this.getArrowData(workspaceClient, jobId); res.setHeader("Content-Type", "application/octet-stream"); @@ -120,6 +173,28 @@ export class AnalyticsPlugin extends Plugin implements ToolProvider { } } + /** + * Stash key used at put-time (in `_handleQueryRoute`) and take-time + * (in `_handleArrowRoute`). Centralized so the two sides cannot drift. + * + * Returns the user id when an `x-forwarded-user` header is present, + * otherwise `"global"` for service-principal contexts (no user header). + * Both queries from the same request resolve to the same key, so the + * subsequent /arrow-result fetch reliably hits the entry stashed + * during the SSE query. + * + * `resolveUserId` throws when no header is present — catch and degrade + * to "global" rather than letting that failure mode bubble through the + * route handler. + */ + protected _stashUserKey(req: express.Request): string { + try { + return this.resolveUserId(req) || "global"; + } catch { + return "global"; + } + } + /** * Handle SQL query execution requests. * When called via asUser(req), uses the user's Databricks credentials. @@ -131,6 +206,19 @@ export class AnalyticsPlugin extends Plugin implements ToolProvider { const { query_key } = req.params; const { parameters, format: rawFormat = "JSON_ARRAY" } = req.body as IAnalyticsQueryRequest; + + if ( + rawFormat !== "JSON_ARRAY" && + rawFormat !== "ARROW_STREAM" && + rawFormat !== "JSON" && + rawFormat !== "ARROW" + ) { + res.status(400).json({ + error: `Invalid format: ${String(rawFormat)}. Expected "JSON_ARRAY" or "ARROW_STREAM".`, + }); + return; + } + const format = normalizeAnalyticsFormat(rawFormat); // Request-scoped logging with WideEvent tracking @@ -165,35 +253,41 @@ export class AnalyticsPlugin extends Plugin implements ToolProvider { // get execution context - user-scoped if .obo.sql, otherwise service principal const executor = isAsUser ? this.asUser(req) : this; const executorKey = isAsUser ? this.resolveUserId(req) : "global"; - - const queryParameters = - format === "ARROW_STREAM" - ? { - formatParameters: { - disposition: "EXTERNAL_LINKS", - format: "ARROW_STREAM", - }, - type: "arrow", - } - : { - type: "result", - }; + // Stash key is always per-request user (never "global"), independent + // of the executor's cache scope. Inline Arrow payloads are single-use + // and short-lived — there is no benefit to sharing them across users, + // and per-user scoping is defense in depth on top of unguessable ids. + const stashUserKey = this._stashUserKey(req); const hashedQuery = this.queryProcessor.hashQuery(query); + // ARROW_STREAM responses reference ephemeral resources that cannot be + // safely replayed from cache: + // - EXTERNAL_LINKS pre-signed URLs expire ~15 min after issue, and + // the warehouse rotates them per execution. + // - INLINE responses point at a synthetic `inline-` job id + // backed by `InlineArrowStash`, which drains on the first + // /arrow-result fetch. A cache hit would replay an id whose bytes + // are already gone and reliably 410 the client. + // So we bypass cache for ARROW_STREAM and let every request execute + // a fresh statement. JSON_ARRAY responses still cache normally. + const cacheTtl = format === "ARROW_STREAM" ? 0 : queryDefaults.cache?.ttl; + const cacheConfig = { + ...queryDefaults.cache, + ttl: cacheTtl, + cacheKey: [ + "analytics:query", + query_key, + JSON.stringify(parameters), + format, + hashedQuery, + executorKey, + ], + }; + const defaultConfig: PluginExecuteConfig = { ...queryDefaults, - cache: { - ...queryDefaults.cache, - cacheKey: [ - "analytics:query", - query_key, - JSON.stringify(parameters), - JSON.stringify(format), - hashedQuery, - executorKey, - ], - }, + cache: cacheConfig, }; const streamExecutionSettings: StreamExecutionSettings = { @@ -208,20 +302,126 @@ export class AnalyticsPlugin extends Plugin implements ToolProvider { parameters, ); - const result = await executor.query( + return this._executeWithFormatFallback( + executor, query, processedParams, - queryParameters.formatParameters, + format, + stashUserKey, signal, ); - - return { type: queryParameters.type, ...result }; }, streamExecutionSettings, executorKey, ); } + /** + * Execute a query with automatic disposition fallback for ARROW_STREAM. + * + * - JSON_ARRAY: always uses INLINE disposition, no fallback. + * - ARROW_STREAM: tries INLINE first, falls back to EXTERNAL_LINKS. + * This handles warehouses that only support one disposition. + * + * INLINE attachments are decoded once and put on the plugin's + * `inlineArrowStash`; the SSE message carries the synthetic stash id so + * the client fetches the bytes out-of-band via `/arrow-result/`. + */ + private async _executeWithFormatFallback( + executor: AnalyticsPlugin, + query: string, + processedParams: + | Record + | undefined, + requestedFormat: AnalyticsFormat, + stashUserKey: string, + signal?: AbortSignal, + ): Promise { + if (requestedFormat === "JSON_ARRAY") { + const result = await executor.query( + query, + processedParams, + { disposition: "INLINE", format: "JSON_ARRAY" }, + signal, + ); + return makeResultMessage(result?.data, { + status: result?.status, + statement_id: result?.statement_id, + }); + } + + // ARROW_STREAM: try INLINE first, fall back to EXTERNAL_LINKS. + try { + const result = await executor.query( + query, + processedParams, + { disposition: "INLINE", format: "ARROW_STREAM" }, + signal, + ); + // INLINE responses with an Arrow IPC attachment go through the + // stash-and-serve path: decode the base64 once, hold the bytes + // server-side, emit a synthetic statement id. The client fetches via + // /arrow-result so multi-MiB Arrow blobs never traverse SSE. + if (result?.attachment) { + // If the client has already disconnected, the SSE write would be + // dropped anyway — skip the decode + stash so the bytes do not + // linger in memory until TTL eviction. + if (signal?.aborted) { + throw ExecutionError.canceled(); + } + const decoded = Buffer.from(result.attachment, "base64"); + const inlineId = this.inlineArrowStash.put( + stashUserKey, + new Uint8Array( + decoded.buffer, + decoded.byteOffset, + decoded.byteLength, + ), + ); + if (inlineId === null) { + // Stash is full — every id we have already handed out must + // stay valid, so the stash refuses new entries rather than + // evicting in-flight ones. Fall back to EXTERNAL_LINKS for + // this request so the client still gets its result. + logger.warn( + "Inline Arrow stash full, falling back to EXTERNAL_LINKS for the current query", + ); + } else { + return makeArrowMessage(inlineId, { status: result.status }); + } + } else { + return makeResultMessage(result?.data, { + status: result?.status, + statement_id: result?.statement_id, + }); + } + } catch (err: unknown) { + // If the request was aborted, do not retry — the signal is dead and + // a second statement would be billed but never read. + if (signal?.aborted) { + throw err; + } + + if (!_isInlineArrowUnsupported(err)) { + throw err; + } + + const msg = err instanceof Error ? err.message : String(err); + logger.warn( + "ARROW_STREAM INLINE rejected by warehouse, falling back to EXTERNAL_LINKS: %s", + msg, + ); + } + + const result = await executor.query( + query, + processedParams, + { disposition: "EXTERNAL_LINKS", format: "ARROW_STREAM" }, + signal, + ); + return makeArrowMessage(result.statement_id, { status: result.status }); + } + /** * Execute a SQL query using the current execution context. * @@ -338,6 +538,48 @@ export class AnalyticsPlugin extends Plugin implements ToolProvider { } } +/** + * Determine whether a warehouse error indicates that ARROW_STREAM + INLINE + * is unsupported, vs an unrelated SQL/permission error. + * + * Preferred path: read the structured `errorCode` we now propagate from the + * SDK's `ApiError.errorCode` and the warehouse's `status.error.error_code` + * through `ExecutionError`. This is stable across error-message wording + * changes. + * + * Substring backstop: if the upstream error didn't surface a code (legacy + * SDK builds, or errors thrown outside the connector's wrap path), fall + * back to requiring both INLINE and ARROW_STREAM keywords in the message + * plus a marker phrase. The pair-requirement avoids matching unrelated SQL + * errors that happen to mention one of the words (e.g. a column named + * `INLINE_USERS`). + */ +function _isInlineArrowUnsupported(err: unknown): boolean { + const structuredCode = + err instanceof ExecutionError ? err.errorCode : undefined; + if ( + structuredCode === "INVALID_PARAMETER_VALUE" || + structuredCode === "NOT_IMPLEMENTED" + ) { + // Structured code already tells us the warehouse rejected the request. + // Require keyword pairing to confirm it's the disposition/format combo + // (vs an INVALID_PARAMETER_VALUE for something else entirely). + const msg = err instanceof Error ? err.message : String(err); + return msg.includes("INLINE") && msg.includes("ARROW_STREAM"); + } + + // Backstop for errors without a structured code. + const msg = err instanceof Error ? err.message : String(err); + if (!msg.includes("INLINE") || !msg.includes("ARROW_STREAM")) { + return false; + } + return ( + msg.includes("not supported") || + msg.includes("INVALID_PARAMETER_VALUE") || + msg.includes("NOT_IMPLEMENTED") + ); +} + /** * @internal */ diff --git a/packages/appkit/src/plugins/analytics/inline-arrow-stash.ts b/packages/appkit/src/plugins/analytics/inline-arrow-stash.ts new file mode 100644 index 000000000..3ae98330e --- /dev/null +++ b/packages/appkit/src/plugins/analytics/inline-arrow-stash.ts @@ -0,0 +1,147 @@ +import { randomUUID } from "node:crypto"; + +/** + * Server-side stash for inline Arrow IPC payloads. + * + * When a warehouse returns ARROW_STREAM + INLINE results, the bytes are + * stashed here and a synthetic "inline-" job id is emitted on the + * SSE control channel. The client then fetches the bytes out-of-band via + * `/arrow-result/`, which drains the stash and serves the payload as + * `application/vnd.apache.arrow.stream`. + * + * Keeps multi-MiB Arrow blobs off SSE, lets the existing /arrow-result + * pipeline handle both inline and EXTERNAL_LINKS results uniformly, and + * delivers the bytes with a real binary content-type instead of base64 + * inside JSON inside SSE framing. + * + * Properties: + * - **Drain-on-read**: a successful `take()` removes the entry. There is + * no replay path — a lost client connection means the bytes are gone. + * - **TTL bounded**: entries past their expiry are evicted on every + * `put()` and `take()`. No background timer. + * - **Per-user keyed**: `take()` only returns bytes if the requesting + * user matches the user that originally put them. Defense in depth on + * top of unguessable ids. + * - **Memory bounded with rejection**: total stashed bytes are capped. + * When `put()` cannot fit a payload without exceeding the cap it + * returns `null` rather than evicting older entries — every issued id + * stays valid until it is drained, expires, or the process exits. + * Callers are expected to fall back to a different delivery path (e.g. + * EXTERNAL_LINKS) when `put()` rejects. + * + * Caveat (multi-replica deployments): this stash is process-local. A + * subsequent `GET /arrow-result/inline-*` that lands on a different + * replica than the one that stashed the bytes will 410. Deployments + * that run more than one replica need sticky sessions (route both + * requests in the same logical session to the same replica) or a + * shared external store, neither of which is in scope here. + */ +interface InlineArrowStashOptions { + /** Entries older than this are dropped on the next gc tick. */ + ttlMs?: number; + /** + * Hard cap on total bytes held. `put()` rejects (returns `null`) once + * the cap would be exceeded; entries already in the stash are not + * evicted to fit new ones. + */ + maxBytes?: number; + /** Test seam: override the synthetic-id generator. */ + idGenerator?: () => string; + /** Test seam: override the clock. */ + now?: () => number; +} + +interface StashEntry { + userId: string; + bytes: Uint8Array; + expiresAt: number; + insertedAt: number; +} + +export class InlineArrowStash { + private entries = new Map(); + private totalBytes = 0; + private readonly ttlMs: number; + private readonly maxBytes: number; + private readonly idGenerator: () => string; + private readonly now: () => number; + + constructor(opts: InlineArrowStashOptions = {}) { + this.ttlMs = opts.ttlMs ?? 10 * 60 * 1000; + this.maxBytes = opts.maxBytes ?? 256 * 1024 * 1024; + this.idGenerator = opts.idGenerator ?? randomUUID; + this.now = opts.now ?? Date.now; + } + + /** + * Stash a payload and return its synthetic job id, or `null` when the + * stash cannot accept it without evicting older entries. The caller is + * expected to fall back to an out-of-band delivery path (e.g. + * EXTERNAL_LINKS) when the return value is `null`. + * + * Single payloads that exceed `maxBytes` outright throw so the caller + * sees the misconfiguration loudly instead of degrading silently every + * time. + */ + put(userId: string, bytes: Uint8Array): string | null { + if (bytes.length > this.maxBytes) { + throw new Error( + `Inline Arrow payload (${bytes.length} bytes) exceeds stash maxBytes (${this.maxBytes})`, + ); + } + this.gc(); + if (this.totalBytes + bytes.length > this.maxBytes) { + // Refuse rather than evicting: every id we have already issued must + // remain valid until naturally drained or expired. + return null; + } + const id = `inline-${this.idGenerator()}`; + const now = this.now(); + this.entries.set(id, { + userId, + bytes, + expiresAt: now + this.ttlMs, + insertedAt: now, + }); + this.totalBytes += bytes.length; + return id; + } + + /** + * Drain a payload from the stash. Returns `undefined` if the id is + * unknown, expired, or belongs to a different user. + */ + take(id: string, userId: string): Uint8Array | undefined { + this.gc(); + const entry = this.entries.get(id); + if (!entry) return undefined; + if (entry.userId !== userId) return undefined; + this.entries.delete(id); + this.totalBytes -= entry.bytes.length; + return entry.bytes; + } + + /** Inspection helpers (primarily for tests). */ + size(): number { + return this.totalBytes; + } + count(): number { + return this.entries.size; + } + + /** Drop all entries (used in plugin shutdown). */ + clear(): void { + this.entries.clear(); + this.totalBytes = 0; + } + + private gc(): void { + const now = this.now(); + for (const [id, entry] of this.entries) { + if (entry.expiresAt <= now) { + this.entries.delete(id); + this.totalBytes -= entry.bytes.length; + } + } + } +} diff --git a/packages/appkit/src/plugins/analytics/tests/analytics.test.ts b/packages/appkit/src/plugins/analytics/tests/analytics.test.ts index eb06ea952..b4522eabe 100644 --- a/packages/appkit/src/plugins/analytics/tests/analytics.test.ts +++ b/packages/appkit/src/plugins/analytics/tests/analytics.test.ts @@ -106,6 +106,87 @@ describe("Analytics Plugin", () => { ); }); + test("/arrow-result/inline-* drains the stash and serves bytes as application/vnd.apache.arrow.stream", async () => { + const plugin = new AnalyticsPlugin(config); + const { router, getHandler } = createMockRouter(); + plugin.injectRoutes(router); + + const arrowBytes = new Uint8Array([0xff, 0xfe, 0xfd, 0xfc]); + const id = (plugin as any).inlineArrowStash.put("global", arrowBytes); + expect(id.startsWith("inline-")).toBe(true); + + const handler = getHandler("GET", "/arrow-result/:jobId"); + const mockReq = createMockRequest({ params: { jobId: id } }); + const mockRes = createMockResponse(); + + await handler(mockReq, mockRes); + + expect(mockRes.setHeader).toHaveBeenCalledWith( + "Content-Type", + "application/vnd.apache.arrow.stream", + ); + expect(mockRes.setHeader).toHaveBeenCalledWith( + "Content-Length", + String(arrowBytes.length), + ); + expect(mockRes.setHeader).toHaveBeenCalledWith( + "Cache-Control", + "no-store", + ); + expect(mockRes.send).toHaveBeenCalledTimes(1); + const sentBuf = (mockRes.send as any).mock.calls[0][0] as Buffer; + expect(Buffer.isBuffer(sentBuf)).toBe(true); + expect(Array.from(sentBuf)).toEqual(Array.from(arrowBytes)); + + // Drain-on-read: a second fetch must return 410, not the bytes again. + const secondRes = createMockResponse(); + await handler(mockReq, secondRes); + expect(secondRes.status).toHaveBeenCalledWith(410); + }); + + test("/arrow-result/inline-* returns 410 when the stash entry never existed", async () => { + const plugin = new AnalyticsPlugin(config); + const { router, getHandler } = createMockRouter(); + plugin.injectRoutes(router); + + const handler = getHandler("GET", "/arrow-result/:jobId"); + const mockReq = createMockRequest({ + params: { jobId: "inline-does-not-exist" }, + }); + const mockRes = createMockResponse(); + + await handler(mockReq, mockRes); + + expect(mockRes.status).toHaveBeenCalledWith(410); + expect(mockRes.json).toHaveBeenCalledWith( + expect.objectContaining({ + error: expect.stringMatching(/expired or unknown/), + }), + ); + }); + + test("/arrow-result/inline-* returns 410 when the stash entry belongs to a different user", async () => { + const plugin = new AnalyticsPlugin(config); + const { router, getHandler } = createMockRouter(); + plugin.injectRoutes(router); + + // Stash entry keyed to user-a, but the request resolves to "global" + // (no x-forwarded-user header) — keys differ, take must return + // nothing, and the entry stays put (single-user view). + const bytes = new Uint8Array([1, 2, 3]); + const id = (plugin as any).inlineArrowStash.put("user-a", bytes); + + const handler = getHandler("GET", "/arrow-result/:jobId"); + const mockReq = createMockRequest({ params: { jobId: id } }); + const mockRes = createMockResponse(); + + await handler(mockReq, mockRes); + + expect(mockRes.status).toHaveBeenCalledWith(410); + // The entry must still be there for the real owner. + expect((plugin as any).inlineArrowStash.take(id, "user-a")).toBeDefined(); + }); + test("/query/:query_key should return 400 when query_key is missing", async () => { const plugin = new AnalyticsPlugin(config); const { router, getHandler } = createMockRouter(); @@ -581,6 +662,472 @@ describe("Analytics Plugin", () => { ); }); + test("/query/:query_key should pass INLINE + ARROW_STREAM format parameters when format is ARROW_STREAM", async () => { + const plugin = new AnalyticsPlugin(config); + const { router, getHandler } = createMockRouter(); + + (plugin as any).app.getAppQuery = vi.fn().mockResolvedValue({ + query: "SELECT * FROM test", + isAsUser: false, + }); + + const executeMock = vi.fn().mockResolvedValue({ + result: { data: [{ id: 1 }] }, + }); + (plugin as any).SQLClient.executeStatement = executeMock; + + plugin.injectRoutes(router); + + const handler = getHandler("POST", "/query/:query_key"); + const mockReq = createMockRequest({ + params: { query_key: "test_query" }, + body: { parameters: {}, format: "ARROW_STREAM" }, + }); + const mockRes = createMockResponse(); + + await handler(mockReq, mockRes); + + expect(executeMock).toHaveBeenCalledWith( + expect.anything(), + expect.objectContaining({ + statement: "SELECT * FROM test", + warehouse_id: "test-warehouse-id", + disposition: "INLINE", + format: "ARROW_STREAM", + }), + expect.any(AbortSignal), + ); + }); + + test("/query/:query_key should use INLINE + JSON_ARRAY by default when no format specified", async () => { + const plugin = new AnalyticsPlugin(config); + const { router, getHandler } = createMockRouter(); + + (plugin as any).app.getAppQuery = vi.fn().mockResolvedValue({ + query: "SELECT * FROM test", + isAsUser: false, + }); + + const executeMock = vi.fn().mockResolvedValue({ + result: { data: [{ id: 1 }] }, + }); + (plugin as any).SQLClient.executeStatement = executeMock; + + plugin.injectRoutes(router); + + const handler = getHandler("POST", "/query/:query_key"); + const mockReq = createMockRequest({ + params: { query_key: "test_query" }, + body: { parameters: {} }, + }); + const mockRes = createMockResponse(); + + await handler(mockReq, mockRes); + + expect(executeMock).toHaveBeenCalledWith( + expect.anything(), + expect.objectContaining({ + disposition: "INLINE", + format: "JSON_ARRAY", + }), + expect.any(AbortSignal), + ); + }); + + test("/query/:query_key should pass INLINE + JSON_ARRAY when format is explicitly JSON_ARRAY", async () => { + const plugin = new AnalyticsPlugin(config); + const { router, getHandler } = createMockRouter(); + + (plugin as any).app.getAppQuery = vi.fn().mockResolvedValue({ + query: "SELECT * FROM test", + isAsUser: false, + }); + + const executeMock = vi.fn().mockResolvedValue({ + result: { data: [{ id: 1 }] }, + }); + (plugin as any).SQLClient.executeStatement = executeMock; + + plugin.injectRoutes(router); + + const handler = getHandler("POST", "/query/:query_key"); + const mockReq = createMockRequest({ + params: { query_key: "test_query" }, + body: { parameters: {}, format: "JSON_ARRAY" }, + }); + const mockRes = createMockResponse(); + + await handler(mockReq, mockRes); + + expect(executeMock.mock.calls[0][1]).toMatchObject({ + disposition: "INLINE", + format: "JSON_ARRAY", + }); + }); + + test("/query/:query_key should fall back ARROW_STREAM from INLINE to EXTERNAL_LINKS when warehouse rejects INLINE", async () => { + const plugin = new AnalyticsPlugin(config); + const { router, getHandler } = createMockRouter(); + + (plugin as any).app.getAppQuery = vi.fn().mockResolvedValue({ + query: "SELECT * FROM test", + isAsUser: false, + }); + + const executeMock = vi + .fn() + .mockRejectedValueOnce( + new Error( + "INVALID_PARAMETER_VALUE: ARROW_STREAM not supported with INLINE disposition", + ), + ) + .mockResolvedValueOnce({ + result: { statement_id: "stmt-1", status: { state: "SUCCEEDED" } }, + }); + (plugin as any).SQLClient.executeStatement = executeMock; + + plugin.injectRoutes(router); + + const handler = getHandler("POST", "/query/:query_key"); + const mockReq = createMockRequest({ + params: { query_key: "test_query" }, + body: { parameters: {}, format: "ARROW_STREAM" }, + }); + const mockRes = createMockResponse(); + + await handler(mockReq, mockRes); + + // First call: INLINE (rejected) + expect(executeMock.mock.calls[0][1]).toMatchObject({ + disposition: "INLINE", + format: "ARROW_STREAM", + }); + // Second call: EXTERNAL_LINKS (fallback) + expect(executeMock.mock.calls[1][1]).toMatchObject({ + disposition: "EXTERNAL_LINKS", + format: "ARROW_STREAM", + }); + }); + + test("/query/:query_key falls back on a structured ExecutionError.errorCode without scanning the message", async () => { + const plugin = new AnalyticsPlugin(config); + const { router, getHandler } = createMockRouter(); + + (plugin as any).app.getAppQuery = vi.fn().mockResolvedValue({ + query: "SELECT * FROM test", + isAsUser: false, + }); + + // Properly-structured ExecutionError, as the connector now produces + // when the SDK's ApiError surfaces with errorCode set. + const { ExecutionError } = await import("../../../errors/execution"); + const structuredError = ExecutionError.statementFailed( + "ARROW_STREAM is not supported with INLINE disposition", + "INVALID_PARAMETER_VALUE", + ); + + const executeMock = vi + .fn() + .mockRejectedValueOnce(structuredError) + .mockResolvedValueOnce({ + result: { statement_id: "stmt-1", status: { state: "SUCCEEDED" } }, + }); + (plugin as any).SQLClient.executeStatement = executeMock; + + plugin.injectRoutes(router); + + const handler = getHandler("POST", "/query/:query_key"); + const mockReq = createMockRequest({ + params: { query_key: "test_query" }, + body: { parameters: {}, format: "ARROW_STREAM" }, + }); + const mockRes = createMockResponse(); + + await handler(mockReq, mockRes); + + // Both attempts: INLINE (rejected via structured code) → EXTERNAL_LINKS. + expect(executeMock).toHaveBeenCalledTimes(2); + expect(executeMock.mock.calls[1][1]).toMatchObject({ + disposition: "EXTERNAL_LINKS", + format: "ARROW_STREAM", + }); + }); + + test("/query/:query_key falls back when error message carries a structured INVALID_PARAMETER_VALUE error_code", async () => { + const plugin = new AnalyticsPlugin(config); + const { router, getHandler } = createMockRouter(); + + (plugin as any).app.getAppQuery = vi.fn().mockResolvedValue({ + query: "SELECT * FROM test", + isAsUser: false, + }); + + // Wrapped JSON error like the SDK surfaces from a `Bad Request` HTTP + // response. Both INLINE and ARROW_STREAM appear, plus the structured code. + const wrappedJsonError = new Error( + 'Response from server (Bad Request) {"error_code":"INVALID_PARAMETER_VALUE","message":"ARROW_STREAM is not supported with INLINE disposition on this warehouse"}', + ); + const executeMock = vi + .fn() + .mockRejectedValueOnce(wrappedJsonError) + .mockResolvedValueOnce({ + result: { statement_id: "stmt-1", status: { state: "SUCCEEDED" } }, + }); + (plugin as any).SQLClient.executeStatement = executeMock; + + plugin.injectRoutes(router); + + const handler = getHandler("POST", "/query/:query_key"); + const mockReq = createMockRequest({ + params: { query_key: "test_query" }, + body: { parameters: {}, format: "ARROW_STREAM" }, + }); + const mockRes = createMockResponse(); + + await handler(mockReq, mockRes); + + // Both attempts ran: INLINE (rejected) then EXTERNAL_LINKS (succeeded). + expect(executeMock).toHaveBeenCalledTimes(2); + expect(executeMock.mock.calls[1][1]).toMatchObject({ + disposition: "EXTERNAL_LINKS", + format: "ARROW_STREAM", + }); + }); + + test("/query/:query_key does NOT fall back when only one of INLINE/ARROW_STREAM appears in the error", async () => { + const plugin = new AnalyticsPlugin(config); + const { router, getHandler } = createMockRouter(); + + (plugin as any).app.getAppQuery = vi.fn().mockResolvedValue({ + query: "SELECT * FROM test", + isAsUser: false, + }); + + // Realistic non-format error that mentions just one of the keywords — + // e.g. an unrelated INVALID_PARAMETER_VALUE about a different param. + const executeMock = vi + .fn() + .mockRejectedValue( + new Error( + 'Response from server (Bad Request) {"error_code":"INVALID_PARAMETER_VALUE","message":"INLINE is not a valid value for parameter `mode`"}', + ), + ); + (plugin as any).SQLClient.executeStatement = executeMock; + + plugin.injectRoutes(router); + + const handler = getHandler("POST", "/query/:query_key"); + const mockReq = createMockRequest({ + params: { query_key: "test_query" }, + body: { parameters: {}, format: "ARROW_STREAM" }, + }); + const mockRes = createMockResponse(); + + await handler(mockReq, mockRes); + + // The retry interceptor may attempt the query multiple times, but the + // analytics plugin must never escalate to EXTERNAL_LINKS for an error + // that doesn't actually indicate a format/disposition rejection. + for (const call of executeMock.mock.calls) { + expect(call[1]).toMatchObject({ + disposition: "INLINE", + format: "ARROW_STREAM", + }); + } + }); + + test("/query/:query_key should not fall back for non-format errors", async () => { + const plugin = new AnalyticsPlugin(config); + const { router, getHandler } = createMockRouter(); + + (plugin as any).app.getAppQuery = vi.fn().mockResolvedValue({ + query: "SELECT * FROM test", + isAsUser: false, + }); + + const executeMock = vi + .fn() + .mockRejectedValue(new Error("PERMISSION_DENIED: no access")); + (plugin as any).SQLClient.executeStatement = executeMock; + + plugin.injectRoutes(router); + + const handler = getHandler("POST", "/query/:query_key"); + const mockReq = createMockRequest({ + params: { query_key: "test_query" }, + body: { parameters: {}, format: "ARROW_STREAM" }, + }); + const mockRes = createMockResponse(); + + await handler(mockReq, mockRes); + + // Only one call — non-format error is not retried with different disposition. + for (const call of executeMock.mock.calls) { + expect(call[1]).toMatchObject({ + disposition: "INLINE", + format: "ARROW_STREAM", + }); + } + }); + + test("/query/:query_key stashes ARROW_STREAM INLINE bytes and emits an arrow message with a synthetic inline- id", async () => { + const plugin = new AnalyticsPlugin(config); + const { router, getHandler } = createMockRouter(); + + (plugin as any).app.getAppQuery = vi.fn().mockResolvedValue({ + query: "SELECT * FROM test", + isAsUser: false, + }); + + // Real base64 so the route can decode it via Buffer.from(..., "base64"). + const arrowBytes = new Uint8Array([1, 2, 3, 4, 5]); + const fakeAttachment = Buffer.from(arrowBytes).toString("base64"); + const executeMock = vi.fn().mockResolvedValue({ + result: { attachment: fakeAttachment, row_count: 1 }, + }); + (plugin as any).SQLClient.executeStatement = executeMock; + + plugin.injectRoutes(router); + + const handler = getHandler("POST", "/query/:query_key"); + const mockReq = createMockRequest({ + params: { query_key: "test_query" }, + body: { parameters: {}, format: "ARROW_STREAM" }, + }); + const mockRes = createMockResponse(); + + await handler(mockReq, mockRes); + + // The route should not fall back to EXTERNAL_LINKS — INLINE succeeded. + expect(executeMock).toHaveBeenCalledTimes(1); + expect(executeMock.mock.calls[0][1]).toMatchObject({ + disposition: "INLINE", + format: "ARROW_STREAM", + }); + // SSE payload: unified `arrow` message with an inline- prefixed id. + // The base64 attachment must NOT appear on the SSE channel. + const writeCalls = (mockRes.write as any).mock.calls.map( + (c: any[]) => c[0] as string, + ); + const payload = writeCalls.find((s: string) => s.startsWith("data: ")); + expect(payload).toBeDefined(); + expect(payload).toContain('"type":"arrow"'); + expect(payload).toMatch(/"statement_id":"inline-[^"]+"/); + expect(payload).not.toContain("arrow_inline"); + expect(payload).not.toContain(fakeAttachment); + + // The decoded bytes should be in the stash, keyed by the same + // synthetic id; a subsequent /arrow-result fetch will drain them. + const idMatch = payload?.match(/"statement_id":"(inline-[^"]+)"/); + expect(idMatch).not.toBeNull(); + const inlineId = idMatch?.[1]; + const stashed = (plugin as any).inlineArrowStash.take(inlineId, "global"); + expect(stashed).toBeDefined(); + expect(Array.from(stashed)).toEqual(Array.from(arrowBytes)); + }); + + test("/query/:query_key rejects unknown format values with 400", async () => { + const plugin = new AnalyticsPlugin(config); + const { router, getHandler } = createMockRouter(); + + const executeMock = vi.fn(); + (plugin as any).SQLClient.executeStatement = executeMock; + + plugin.injectRoutes(router); + + const handler = getHandler("POST", "/query/:query_key"); + // "CSV" is genuinely unsupported. The legacy spellings "JSON" / "ARROW" + // are *accepted* by the route (normalized to JSON_ARRAY / ARROW_STREAM + // for back-compat with appkit < 0.33.0), so they must not be used here. + const mockReq = createMockRequest({ + params: { query_key: "test_query" }, + body: { parameters: {}, format: "CSV" }, + }); + const mockRes = createMockResponse(); + + await handler(mockReq, mockRes); + + expect(mockRes.status).toHaveBeenCalledWith(400); + expect(executeMock).not.toHaveBeenCalled(); + }); + + test("/query/:query_key does not retry the fallback when the request was aborted", async () => { + const plugin = new AnalyticsPlugin(config); + const { router, getHandler } = createMockRouter(); + + (plugin as any).app.getAppQuery = vi.fn().mockResolvedValue({ + query: "SELECT * FROM test", + isAsUser: false, + }); + + const executeMock = vi.fn().mockImplementation((_wc, _opts, signal) => { + // Simulate a signal that becomes aborted before the failure surfaces — + // e.g. the client cancelled the SSE stream mid-query. Use vitest's + // getter spy rather than Object.defineProperty so we don't try to + // override the native non-configurable AbortSignal.aborted getter. + if (signal) { + vi.spyOn(signal, "aborted", "get").mockReturnValue(true); + } + return Promise.reject( + new Error( + "INVALID_PARAMETER_VALUE: ARROW_STREAM not supported with INLINE disposition", + ), + ); + }); + (plugin as any).SQLClient.executeStatement = executeMock; + + plugin.injectRoutes(router); + + const handler = getHandler("POST", "/query/:query_key"); + const mockReq = createMockRequest({ + params: { query_key: "test_query" }, + body: { parameters: {}, format: "ARROW_STREAM" }, + }); + const mockRes = createMockResponse(); + + await handler(mockReq, mockRes); + + // Even though the error message would normally trigger fallback, the + // aborted signal should short-circuit and prevent a second statement. + expect(executeMock).toHaveBeenCalledTimes(1); + }); + + test("/query/:query_key should not fall back when format is explicitly JSON_ARRAY", async () => { + const plugin = new AnalyticsPlugin(config); + const { router, getHandler } = createMockRouter(); + + (plugin as any).app.getAppQuery = vi.fn().mockResolvedValue({ + query: "SELECT * FROM test", + isAsUser: false, + }); + + const executeMock = vi + .fn() + .mockRejectedValue( + new Error("INVALID_PARAMETER_VALUE: only supports ARROW_STREAM"), + ); + (plugin as any).SQLClient.executeStatement = executeMock; + + plugin.injectRoutes(router); + + const handler = getHandler("POST", "/query/:query_key"); + const mockReq = createMockRequest({ + params: { query_key: "test_query" }, + body: { parameters: {}, format: "JSON_ARRAY" }, + }); + const mockRes = createMockResponse(); + + await handler(mockReq, mockRes); + + // All calls use JSON_ARRAY + INLINE — explicit JSON_ARRAY, no fallback. + for (const call of executeMock.mock.calls) { + expect(call[1]).toMatchObject({ + disposition: "INLINE", + format: "JSON_ARRAY", + }); + } + }); + test("should return 404 when query file is not found", async () => { const plugin = new AnalyticsPlugin(config); const { router, getHandler } = createMockRouter(); diff --git a/packages/appkit/src/plugins/analytics/tests/inline-arrow-stash.test.ts b/packages/appkit/src/plugins/analytics/tests/inline-arrow-stash.test.ts new file mode 100644 index 000000000..9bd598132 --- /dev/null +++ b/packages/appkit/src/plugins/analytics/tests/inline-arrow-stash.test.ts @@ -0,0 +1,116 @@ +import { describe, expect, test } from "vitest"; +import { InlineArrowStash } from "../inline-arrow-stash"; + +function bytes(n: number): Uint8Array { + return new Uint8Array(n); +} + +// `put()` returns `string | null` — it rejects with null when the stash is +// full. Every test below that exercises a successful put narrows via this +// helper so the non-null contract is explicit at the call site. +function mustPut( + stash: InlineArrowStash, + userId: string, + b: Uint8Array, +): string { + const id = stash.put(userId, b); + if (id === null) { + throw new Error("test setup: stash unexpectedly rejected put"); + } + return id; +} + +describe("InlineArrowStash", () => { + test("put returns an inline-prefixed synthetic id", () => { + const stash = new InlineArrowStash({ idGenerator: () => "abc" }); + const id = mustPut(stash, "user-1", bytes(100)); + expect(id).toBe("inline-abc"); + }); + + test("take drains the entry", () => { + const stash = new InlineArrowStash(); + const id = mustPut(stash, "user-1", bytes(100)); + expect(stash.count()).toBe(1); + expect(stash.size()).toBe(100); + + const got = stash.take(id, "user-1"); + expect(got).toBeDefined(); + expect(got?.length).toBe(100); + expect(stash.count()).toBe(0); + expect(stash.size()).toBe(0); + // Drain-on-read: second take returns undefined. + expect(stash.take(id, "user-1")).toBeUndefined(); + }); + + test("take returns undefined for unknown id", () => { + const stash = new InlineArrowStash(); + expect(stash.take("inline-nope", "user-1")).toBeUndefined(); + }); + + test("take returns undefined when userId does not match", () => { + const stash = new InlineArrowStash(); + const id = mustPut(stash, "user-1", bytes(100)); + expect(stash.take(id, "user-2")).toBeUndefined(); + // Entry is still there for the right user. + expect(stash.take(id, "user-1")).toBeDefined(); + }); + + test("entries past TTL are evicted on next gc tick", () => { + let clock = 0; + const stash = new InlineArrowStash({ ttlMs: 1000, now: () => clock }); + const id = mustPut(stash, "user-1", bytes(50)); + clock = 999; + expect(stash.take(id, "user-1")).toBeDefined(); + + const id2 = mustPut(stash, "user-1", bytes(50)); + clock = 2000; + // Bump the clock past TTL and trigger gc via another put. + mustPut(stash, "user-2", bytes(10)); + expect(stash.take(id2, "user-1")).toBeUndefined(); + }); + + test("put returns null when adding the payload would exceed maxBytes, leaving existing entries intact", () => { + let seq = 0; + const stash = new InlineArrowStash({ + maxBytes: 200, + idGenerator: () => String(seq++), + }); + const a = mustPut(stash, "user-1", bytes(80)); + const b = mustPut(stash, "user-1", bytes(80)); + expect(stash.size()).toBe(160); + + // This third 80-byte entry would push total to 240 (>200). It must + // be rejected, and both prior entries must survive — every id we have + // already handed out stays valid until drained or expired. + const c = stash.put("user-1", bytes(80)); + expect(c).toBeNull(); + expect(stash.size()).toBe(160); + expect(stash.take(a, "user-1")).toBeDefined(); + expect(stash.take(b, "user-1")).toBeDefined(); + }); + + test("put throws for a single payload larger than maxBytes (caller misconfiguration)", () => { + const stash = new InlineArrowStash({ maxBytes: 100 }); + expect(() => stash.put("user-1", bytes(200))).toThrow( + /exceeds stash maxBytes/, + ); + }); + + test("synthetic ids are unique across puts", () => { + const stash = new InlineArrowStash(); + const a = mustPut(stash, "user-1", bytes(10)); + const b = mustPut(stash, "user-1", bytes(10)); + expect(a).not.toBe(b); + expect(a.startsWith("inline-")).toBe(true); + expect(b.startsWith("inline-")).toBe(true); + }); + + test("clear drops every entry", () => { + const stash = new InlineArrowStash(); + mustPut(stash, "user-1", bytes(10)); + mustPut(stash, "user-2", bytes(20)); + stash.clear(); + expect(stash.count()).toBe(0); + expect(stash.size()).toBe(0); + }); +}); diff --git a/packages/appkit/src/stream/defaults.ts b/packages/appkit/src/stream/defaults.ts index c8fc91591..5cb822efd 100644 --- a/packages/appkit/src/stream/defaults.ts +++ b/packages/appkit/src/stream/defaults.ts @@ -1,6 +1,13 @@ export const streamDefaults = { bufferSize: 100, - maxEventSize: 1024 * 1024, // 1MB + // 1 MiB. SSE is used only for short JSON control messages — JSON_ARRAY + // result rows (already row-size-bounded by the warehouse) and the small + // `arrow` envelope (statement id + status) for ARROW_STREAM. Bulk Arrow + // payloads do not traverse SSE; they are fetched over HTTP via + // `/api/analytics/arrow-result/:jobId`, which dispatches to the warehouse + // (EXTERNAL_LINKS) or the server-side `InlineArrowStash` (INLINE) based + // on the id prefix. + maxEventSize: 1 * 1024 * 1024, bufferTTL: 10 * 60 * 1000, // 10 minutes cleanupInterval: 5 * 60 * 1000, // 5 minutes maxPersistentBuffers: 10000, // 10000 buffers diff --git a/packages/appkit/src/stream/tests/stream-registry.test.ts b/packages/appkit/src/stream/tests/stream-registry.test.ts index d3f70e95a..efb88c888 100644 --- a/packages/appkit/src/stream/tests/stream-registry.test.ts +++ b/packages/appkit/src/stream/tests/stream-registry.test.ts @@ -374,7 +374,7 @@ describe("StreamRegistry", () => { expect(dataCall).toBeDefined(); const payload = JSON.parse( - (dataCall![0] as string).replace("data: ", "").trim(), + (dataCall?.[0] as string).replace("data: ", "").trim(), ); expect(payload).toEqual({ error: "Stream evicted", diff --git a/packages/appkit/src/type-generator/query-registry.ts b/packages/appkit/src/type-generator/query-registry.ts index 196690c2d..9bbeb01e9 100644 --- a/packages/appkit/src/type-generator/query-registry.ts +++ b/packages/appkit/src/type-generator/query-registry.ts @@ -1,6 +1,7 @@ import fs from "node:fs/promises"; import path from "node:path"; import { WorkspaceClient } from "@databricks/sdk-experimental"; +import { tableFromIPC } from "apache-arrow"; import pc from "picocolors"; import { createLogger } from "../logging/logger"; import { CACHE_VERSION, hashSQL, loadCache, saveCache } from "./cache"; @@ -129,18 +130,69 @@ function formatParametersType(sql: string): string { : "Record"; } +/** + * Decode a base64 Arrow IPC attachment from a DESCRIBE QUERY response and + * extract column metadata. Returns the same shape as rows parsed from the + * legacy data_array path. + * + * IMPORTANT: a DESCRIBE QUERY response is itself a result *table* with rows + * shaped like `(col_name, data_type, comment)` describing the user query's + * output schema. We must read those rows — NOT `table.schema.fields`, which + * would describe DESCRIBE QUERY's own output (`col_name`, `data_type`, + * `comment`) and yield bogus types for every query. + */ +function columnsFromArrowAttachment( + attachment: string, +): Array<{ name: string; type_name: string; comment: string | undefined }> { + const buf = Buffer.from(attachment, "base64"); + const table = tableFromIPC(buf); + return table.toArray().map((row) => { + const obj = row.toJSON() as { + col_name?: unknown; + data_type?: unknown; + comment?: unknown; + }; + return { + name: typeof obj.col_name === "string" ? obj.col_name : "", + type_name: + typeof obj.data_type === "string" + ? obj.data_type.toUpperCase() + : "STRING", + comment: + typeof obj.comment === "string" && obj.comment !== "" + ? obj.comment + : undefined, + }; + }); +} + export function convertToQueryType( result: DatabricksStatementExecutionResponse, sql: string, queryName: string, ): { type: string; hasResults: boolean } { const dataRows = result.result?.data_array || []; - const columns = dataRows.map((row) => ({ + let columns = dataRows.map((row) => ({ name: row[0] || "", type_name: row[1]?.toUpperCase() || "STRING", comment: row[2] || undefined, })); + // Fallback: serverless warehouses return ARROW_STREAM format with an inline + // base64 attachment instead of data_array. Decode the Arrow IPC rows (the + // DESCRIBE QUERY result table) to extract column names and types. + if (columns.length === 0 && result.result?.attachment) { + logger.debug("data_array empty, decoding Arrow IPC attachment for schema"); + try { + columns = columnsFromArrowAttachment(result.result.attachment); + } catch (err) { + logger.warn( + "Failed to decode Arrow IPC attachment: %s", + err instanceof Error ? err.message : String(err), + ); + } + } + const paramsType = formatParametersType(sql); // generate result fields with JSDoc @@ -379,6 +431,16 @@ export async function generateQueriesFromDescribe( `Describing ${total} ${total === 1 ? "query" : "queries"} (0/${total})`, ); + // Some serverless warehouses reject JSON_ARRAY+INLINE for DESCRIBE — and + // they signal the rejection two different ways: either as a thrown error, + // or as a `status.state === "FAILED"` response. Both paths funnel through + // this matcher so we can retry with ARROW_STREAM+INLINE consistently. + const looksLikeFormatRejection = (msg: string): boolean => + msg.includes("JSON_ARRAY") && + (msg.includes("not supported") || + msg.includes("INVALID_PARAMETER_VALUE") || + msg.includes("NOT_IMPLEMENTED")); + const describeOne = async ({ index, queryName, @@ -386,10 +448,56 @@ export async function generateQueriesFromDescribe( sqlHash, cleanedSql, }: (typeof uncachedQueries)[number]): Promise => { - const result = (await client.statementExecution.executeStatement({ - statement: `DESCRIBE QUERY ${cleanedSql}`, - warehouse_id: warehouseId, - })) as DatabricksStatementExecutionResponse; + // Prefer JSON_ARRAY + INLINE so `data_array` parsing works directly. + // Some serverless warehouses reject this combination — fall back to + // ARROW_STREAM + INLINE (still inline, just a different format) and + // let `convertToQueryType` decode the inline attachment. Forcing + // INLINE on the retry avoids EXTERNAL_LINKS, which would silently + // produce empty `data_array` and degrade types to `unknown`. + let result: DatabricksStatementExecutionResponse; + try { + result = (await client.statementExecution.executeStatement({ + statement: `DESCRIBE QUERY ${cleanedSql}`, + warehouse_id: warehouseId, + format: "JSON_ARRAY", + disposition: "INLINE", + })) as DatabricksStatementExecutionResponse; + } catch (err: unknown) { + const msg = err instanceof Error ? err.message : String(err); + if (looksLikeFormatRejection(msg)) { + logger.debug( + "Warehouse rejected JSON_ARRAY+INLINE for %s (thrown), retrying with ARROW_STREAM+INLINE", + queryName, + ); + result = (await client.statementExecution.executeStatement({ + statement: `DESCRIBE QUERY ${cleanedSql}`, + warehouse_id: warehouseId, + format: "ARROW_STREAM", + disposition: "INLINE", + })) as DatabricksStatementExecutionResponse; + } else { + throw err; + } + } + + // Some warehouses surface the format rejection as `status.state === + // "FAILED"` instead of throwing. Detect that shape and retry with + // ARROW_STREAM before we degrade the type to `unknown`. + if ( + result.status.state === "FAILED" && + looksLikeFormatRejection(result.status.error?.message ?? "") + ) { + logger.debug( + "Warehouse rejected JSON_ARRAY+INLINE for %s (state=FAILED), retrying with ARROW_STREAM+INLINE", + queryName, + ); + result = (await client.statementExecution.executeStatement({ + statement: `DESCRIBE QUERY ${cleanedSql}`, + warehouse_id: warehouseId, + format: "ARROW_STREAM", + disposition: "INLINE", + })) as DatabricksStatementExecutionResponse; + } completed++; spinner.update( @@ -397,10 +505,11 @@ export async function generateQueriesFromDescribe( ); logger.debug( - "DESCRIBE result for %s: state=%s, rows=%d", + "DESCRIBE result for %s: state=%s, rows=%d, hasAttachment=%s", queryName, result.status.state, result.result?.data_array?.length ?? 0, + !!result.result?.attachment, ); if (result.status.state === "FAILED") { diff --git a/packages/appkit/src/type-generator/tests/query-registry.test.ts b/packages/appkit/src/type-generator/tests/query-registry.test.ts index 8d46f98e9..63a5636b9 100644 --- a/packages/appkit/src/type-generator/tests/query-registry.test.ts +++ b/packages/appkit/src/type-generator/tests/query-registry.test.ts @@ -1,4 +1,24 @@ -import { describe, expect, test } from "vitest"; +import { Table, tableToIPC, vectorFromArray } from "apache-arrow"; +import { describe, expect, test, vi } from "vitest"; + +const { mockLoggerWarn, mockLoggerDebug } = vi.hoisted(() => ({ + mockLoggerWarn: vi.fn(), + mockLoggerDebug: vi.fn(), +})); +vi.mock("../../logging/logger", () => ({ + createLogger: vi.fn(() => ({ + debug: mockLoggerDebug, + info: vi.fn(), + warn: mockLoggerWarn, + error: vi.fn(), + event: vi.fn(() => ({ + set: vi.fn().mockReturnThis(), + setComponent: vi.fn().mockReturnThis(), + setContext: vi.fn().mockReturnThis(), + })), + })), +})); + import { convertToQueryType, defaultForType, @@ -11,6 +31,20 @@ import { } from "../query-registry"; import type { DatabricksStatementExecutionResponse } from "../types"; +// Build a base64 Arrow IPC payload that mimics a DESCRIBE QUERY response — +// a result *table* with columns (col_name, data_type, comment) describing +// the user query's output schema. +function describeQueryAttachment( + rows: Array<{ col_name: string; data_type: string; comment: string | null }>, +): string { + const table = new Table({ + col_name: vectorFromArray(rows.map((r) => r.col_name)), + data_type: vectorFromArray(rows.map((r) => r.data_type)), + comment: vectorFromArray(rows.map((r) => r.comment ?? "")), + }); + return Buffer.from(tableToIPC(table, "stream")).toString("base64"); +} + describe("normalizeTypeName", () => { test("returns simple types unchanged", () => { expect(normalizeTypeName("STRING")).toBe("STRING"); @@ -346,6 +380,107 @@ SELECT * FROM users WHERE date = :startDate AND count = :count AND name = :name` ); expect(hasResults).toBe(false); }); + + describe("ARROW_STREAM attachment fallback (serverless warehouses)", () => { + test("decodes column metadata from Arrow IPC data rows, not schema fields", () => { + // Critical regression test: it would be a bug to read + // `table.schema.fields` here, which would generate types like + // { col_name: string; data_type: string; comment: string } for every + // query (those are DESCRIBE QUERY's own output columns). We must read + // the data rows. + const attachment = describeQueryAttachment([ + { col_name: "user_id", data_type: "BIGINT", comment: null }, + { col_name: "name", data_type: "STRING", comment: "display name" }, + { col_name: "active", data_type: "BOOLEAN", comment: null }, + ]); + const response: DatabricksStatementExecutionResponse = { + statement_id: "test-arrow", + status: { state: "SUCCEEDED" }, + result: { attachment }, + }; + + const { type, hasResults } = convertToQueryType( + response, + "SELECT user_id, name, active FROM users", + "users", + ); + + expect(hasResults).toBe(true); + // Real query columns appear in the generated type: + expect(type).toContain("user_id: number"); + expect(type).toContain("name: string"); + expect(type).toContain("active: boolean"); + // Column comments survive: + expect(type).toContain("/** display name"); + // The DESCRIBE QUERY metadata column names must NOT leak as user types: + expect(type).not.toContain("col_name: string"); + expect(type).not.toContain("data_type: string"); + }); + + test("normalizes lowercase data_type values to uppercase", () => { + const attachment = describeQueryAttachment([ + { col_name: "id", data_type: "int", comment: null }, + ]); + const response: DatabricksStatementExecutionResponse = { + statement_id: "test-arrow", + status: { state: "SUCCEEDED" }, + result: { attachment }, + }; + + const { type } = convertToQueryType(response, "SELECT 1", "test"); + expect(type).toContain("@sqlType INT"); + expect(type).toContain("id: number"); + }); + + test("prefers data_array over attachment when both are present", () => { + const attachment = describeQueryAttachment([ + { col_name: "from_arrow", data_type: "STRING", comment: null }, + ]); + const response: DatabricksStatementExecutionResponse = { + statement_id: "test-both", + status: { state: "SUCCEEDED" }, + result: { + data_array: [["from_data_array", "INT", null]], + attachment, + }, + }; + + const { type } = convertToQueryType(response, "SELECT 1", "test"); + expect(type).toContain("from_data_array: number"); + expect(type).not.toContain("from_arrow"); + }); + + test("logs a warning and yields the unknown-result fallback on malformed attachment", () => { + mockLoggerWarn.mockClear(); + const response: DatabricksStatementExecutionResponse = { + statement_id: "test-bad", + status: { state: "SUCCEEDED" }, + result: { attachment: "not-valid-arrow-ipc" }, + }; + + const { hasResults, type } = convertToQueryType( + response, + "SELECT 1", + "test", + ); + + // No columns extracted → unknown-result type, hasResults false. + expect(hasResults).toBe(false); + expect(type).toContain("unknown"); + // None of DESCRIBE QUERY's metadata column names should leak in as + // user-facing type fields — that would mean the parser swallowed + // the failure and produced bogus columns instead. + expect(type).not.toContain("col_name"); + expect(type).not.toContain("data_type"); + + // The warning must fire so a regression that silently produces empty + // types (no telemetry signal) fails this test. + expect(mockLoggerWarn).toHaveBeenCalledWith( + expect.stringContaining("Failed to decode Arrow IPC attachment"), + expect.any(String), + ); + }); + }); }); describe("inferParameterTypes", () => { diff --git a/packages/appkit/src/type-generator/types.ts b/packages/appkit/src/type-generator/types.ts index 5af43591a..9a591f512 100644 --- a/packages/appkit/src/type-generator/types.ts +++ b/packages/appkit/src/type-generator/types.ts @@ -12,6 +12,8 @@ export interface DatabricksStatementExecutionResponse { }; result?: { data_array?: (string | null)[][]; + /** Base64-encoded Arrow IPC bytes (returned by serverless warehouses using ARROW_STREAM format) */ + attachment?: string; }; } diff --git a/packages/shared/package.json b/packages/shared/package.json index 27d268ca3..542f7a965 100644 --- a/packages/shared/package.json +++ b/packages/shared/package.json @@ -40,6 +40,7 @@ "ajv": "8.17.1", "ajv-formats": "3.0.1", "@clack/prompts": "1.0.1", - "commander": "12.1.0" + "commander": "12.1.0", + "zod": "4.3.6" } } diff --git a/packages/shared/src/index.ts b/packages/shared/src/index.ts index 9829729a7..d036e0dbd 100644 --- a/packages/shared/src/index.ts +++ b/packages/shared/src/index.ts @@ -4,4 +4,5 @@ export * from "./execute"; export * from "./genie"; export * from "./plugin"; export * from "./sql"; +export * from "./sse/analytics"; export * from "./tunnel"; diff --git a/packages/shared/src/sse/analytics.test.ts b/packages/shared/src/sse/analytics.test.ts new file mode 100644 index 000000000..f66437c36 --- /dev/null +++ b/packages/shared/src/sse/analytics.test.ts @@ -0,0 +1,87 @@ +import { describe, expect, test } from "vitest"; +import { + AnalyticsSseMessage, + makeArrowMessage, + makeResultMessage, +} from "./analytics"; + +describe("AnalyticsSseMessage schema", () => { + test("accepts a result message with rows", () => { + const parsed = AnalyticsSseMessage.parse({ + type: "result", + data: [{ id: 1, name: "alice" }], + }); + expect(parsed.type).toBe("result"); + }); + + test("accepts a result message with no data (empty result)", () => { + expect(() => AnalyticsSseMessage.parse({ type: "result" })).not.toThrow(); + }); + + test("accepts an arrow message with warehouse statement_id", () => { + const parsed = AnalyticsSseMessage.parse({ + type: "arrow", + statement_id: "stmt-1", + }); + expect(parsed.type).toBe("arrow"); + }); + + test("accepts an arrow message with synthetic inline- id", () => { + // Inline Arrow payloads are stashed server-side and surfaced through the + // same `arrow` message variant — the `inline-` prefix tells the + // /arrow-result handler to drain the stash instead of hitting the + // warehouse. The schema must accept both id shapes transparently. + const parsed = AnalyticsSseMessage.parse({ + type: "arrow", + statement_id: "inline-abc-123", + }); + expect(parsed.statement_id).toBe("inline-abc-123"); + }); + + test("rejects an arrow message with empty statement_id", () => { + expect(() => + AnalyticsSseMessage.parse({ type: "arrow", statement_id: "" }), + ).toThrow(); + }); + + test("rejects an arrow message with no statement_id", () => { + expect(() => AnalyticsSseMessage.parse({ type: "arrow" })).toThrow(); + }); + + test("rejects the retired arrow_inline message type", () => { + // arrow_inline was the prior wire shape (base64 payload on the SSE + // channel). The current protocol routes all Arrow payloads through + // /arrow-result; the type must no longer parse. + expect(() => + AnalyticsSseMessage.parse({ type: "arrow_inline", attachment: "AQID" }), + ).toThrow(); + }); + + test("rejects an unknown type", () => { + expect(() => + AnalyticsSseMessage.parse({ type: "unknown_kind", foo: "bar" }), + ).toThrow(); + }); + + test("safeParse returns success: false for malformed payloads", () => { + const r = AnalyticsSseMessage.safeParse({ type: "arrow" }); + expect(r.success).toBe(false); + }); +}); + +describe("typed builders", () => { + test("makeResultMessage roundtrips through the schema", () => { + const msg = makeResultMessage([{ id: 1 }], { statement_id: "s-1" }); + expect(() => AnalyticsSseMessage.parse(msg)).not.toThrow(); + }); + + test("makeArrowMessage roundtrips through the schema", () => { + const msg = makeArrowMessage("stmt-2"); + expect(() => AnalyticsSseMessage.parse(msg)).not.toThrow(); + }); + + test("makeArrowMessage accepts synthetic inline- ids", () => { + const msg = makeArrowMessage("inline-some-uuid"); + expect(() => AnalyticsSseMessage.parse(msg)).not.toThrow(); + }); +}); diff --git a/packages/shared/src/sse/analytics.ts b/packages/shared/src/sse/analytics.ts new file mode 100644 index 000000000..f37d38a51 --- /dev/null +++ b/packages/shared/src/sse/analytics.ts @@ -0,0 +1,79 @@ +import { z } from "zod"; + +/** + * Wire protocol for analytics SSE messages emitted by `/api/analytics/query`. + * + * These schemas are the single source of truth for the contract between the + * server (`AnalyticsPlugin._handleQueryRoute`) and the client + * (`useAnalyticsQuery`). Both sides validate with the same schema: + * + * - Server uses the typed builders (`makeResultMessage`, `makeArrowMessage`) + * to construct messages with compile-time guarantees that all required + * fields are present. + * - Client calls `AnalyticsSseMessage.parse(JSON.parse(event.data))` to fail + * loudly on a malformed payload instead of silently treating an undefined + * field as data. + * + * Arrow payloads — inline or external-links — never traverse the SSE control + * channel; both flow through `/api/analytics/arrow-result/:jobId` and are + * differentiated by an `inline-` prefix on the job id (see + * `InlineArrowStash`). The wire shape from the client's perspective is + * therefore uniform: an `arrow` message carries an id, the client fetches. + * + * Adding a new message variant requires a schema update here, which keeps + * server and client in lockstep. + */ + +/** Successful row-shaped result (JSON_ARRAY format, or empty results). */ +export const AnalyticsResultMessage = z.object({ + type: z.literal("result"), + data: z.array(z.record(z.string(), z.unknown())).optional(), + // Status is opaque metadata forwarded from the warehouse — keep it as + // `unknown` so we don't bake the SDK's detailed shape into the contract. + status: z.unknown().optional(), + statement_id: z.string().optional(), +}); +export type AnalyticsResultMessage = z.infer; + +/** + * ARROW_STREAM result delivered via /arrow-result/:jobId. The id is either: + * - the warehouse-issued `statement_id` for EXTERNAL_LINKS responses, or + * - a synthetic `inline-` id pointing at the server-side + * `InlineArrowStash` for INLINE responses. + * + * Both shapes are fetched the same way; the prefix tells the route handler + * which path to take. + */ +export const AnalyticsArrowMessage = z.object({ + type: z.literal("arrow"), + statement_id: z.string().min(1), + status: z.unknown().optional(), +}); +export type AnalyticsArrowMessage = z.infer; + +/** Discriminated union of every message the analytics SSE stream may emit. */ +export const AnalyticsSseMessage = z.discriminatedUnion("type", [ + AnalyticsResultMessage, + AnalyticsArrowMessage, +]); +export type AnalyticsSseMessage = z.infer; + +// ──────────────────────────────────────────────────────────────────────────── +// Typed builders — call from the server route handler. The compiler enforces +// that every required field is supplied, and the return type narrows so +// downstream code (executeStream / SSE writer) keeps full type information. +// ──────────────────────────────────────────────────────────────────────────── + +export function makeResultMessage( + data: Record[] | undefined, + extras: { status?: unknown; statement_id?: string } = {}, +): AnalyticsResultMessage { + return { type: "result", data, ...extras }; +} + +export function makeArrowMessage( + statement_id: string, + extras: { status?: unknown } = {}, +): AnalyticsArrowMessage { + return { type: "arrow", statement_id, ...extras }; +} diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 4db8fbe3c..d7d755168 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -299,6 +299,9 @@ importers: '@types/semver': specifier: 7.7.1 version: 7.7.1 + apache-arrow: + specifier: 21.1.0 + version: 21.1.0 dotenv: specifier: 16.6.1 version: 16.6.1 @@ -554,6 +557,9 @@ importers: commander: specifier: 12.1.0 version: 12.1.0 + zod: + specifier: 4.3.6 + version: 4.3.6 devDependencies: '@types/express': specifier: 4.17.23 @@ -5564,7 +5570,7 @@ packages: basic-ftp@5.0.5: resolution: {integrity: sha512-4Bcg1P8xhUuqcii/S0Z9wiHIrQVPMermM1any+MX5GeGD7faD3/msQUDGLol9wOcz4/jbg/WJnGqoJF6LiBdtg==} engines: {node: '>=10.0.0'} - deprecated: Security vulnerability fixed in 5.2.0, please upgrade + deprecated: Security vulnerability fixed in 5.2.1, please upgrade batch@0.6.1: resolution: {integrity: sha512-x+VAiMRL6UPkx+kudNvxTl6hB2XNNCG2r+7wixVfIYwu/2HKRXimwQyaumLjMveWvT2Hkd/cAJw+QBMfJ/EKVw==}