From a54fd472937280f8e09d65bd70b33ea7dea704ca Mon Sep 17 00:00:00 2001 From: Mark Turansky Date: Thu, 21 May 2026 00:09:14 +0000 Subject: [PATCH 1/3] Add Events API specification with AG-UI event compression Adds comprehensive Events API documentation to ambient-model.spec.md: - Documents all 33 AG-UI event types across 8 semantic categories (Run Lifecycle, Text Messages, Tool Calls, Thinking, Reasoning, State, etc.) - Defines context-aware event compression strategy to prevent storage bloat from token-level streaming (5:1 to 20:1 compression ratios) - Extends SessionMessage schema with compression metadata: - completed_at: timestamp of last event in compressed group - event_count: number of raw events compressed (1 = uncompressed) - Compression rules: accumulate _CONTENT and _ARGS events within message_id/tool_call_id contexts; flush on boundary events or context change - Backward compatible: compression is opt-in; legacy uncompressed events supported with event_count=1 - Updates ERD diagram to reflect new SessionMessage fields - Adds implementation status to coverage matrix Implementation will be in runner gRPC clients (Python/Go) to compress events before calling PushSessionMessage. API server and database accept both compressed and uncompressed events transparently. Co-Authored-By: Claude Sonnet 4.5 --- specs/api/ambient-model.spec.md | 242 +++++++++++++++++++++++++++++++- 1 file changed, 240 insertions(+), 2 deletions(-) diff --git a/specs/api/ambient-model.spec.md b/specs/api/ambient-model.spec.md index eee4c7250..1eb79b492 100644 --- a/specs/api/ambient-model.spec.md +++ b/specs/api/ambient-model.spec.md @@ -151,9 +151,11 @@ erDiagram string ID PK string session_id FK int seq "monotonic within session" - string event_type "user | assistant | tool_use | tool_result | system | error" - string payload "message body or JSON-encoded event" + string event_type "AG-UI event type (33 types: TEXT_MESSAGE_START, TOOL_CALL_START, etc.)" + string payload "JSON-encoded event payload" time created_at + time completed_at "nullable — last event timestamp for compressed events" + int32 event_count "number of raw events compressed; 1 = uncompressed" } %% ── RBAC ───────────────────────────────────────────────────────────────── @@ -337,6 +339,240 @@ SessionMessages are never deleted or edited. They are the canonical record of wh The runner's `/events/{thread_id}` endpoint registers an asyncio queue into `bridge._active_streams[thread_id]` and streams every AG-UI event as SSE until `RUN_FINISHED` / `RUN_ERROR` or client disconnect. The API server's `/sessions/{id}/events` proxies this from the runner pod for the active session, routing via pod IP or session service. Keepalive pings fire every 30s to hold the connection open. +### Events API — AG-UI Event Protocol + +The Events API provides structured access to the AG-UI event stream emitted by runner pods during session execution. Events are the atomic units of a conversation: text deltas, tool calls, thinking blocks, state updates, and control flow markers. + +#### AG-UI Event Types + +Events follow the [AG-UI protocol](https://github.com/anthropics/ag-ui), a streaming protocol for agentic UIs. The protocol defines 33 event types organized into semantic categories: + +| Category | Event Types | Purpose | +|----------|-------------|---------| +| **Run Lifecycle** | `RUN_STARTED`, `RUN_FINISHED`, `RUN_ERROR` | Session execution boundaries | +| **Step Lifecycle** | `STEP_STARTED`, `STEP_FINISHED` | Multi-step execution boundaries (LangGraph pattern) | +| **Text Messages** | `TEXT_MESSAGE_START`, `TEXT_MESSAGE_CONTENT`, `TEXT_MESSAGE_END`, `TEXT_MESSAGE_CHUNK` | User or assistant text content | +| **Tool Calls** | `TOOL_CALL_START`, `TOOL_CALL_ARGS`, `TOOL_CALL_END`, `TOOL_CALL_CHUNK`, `TOOL_CALL_RESULT` | Tool invocations and results | +| **Thinking** | `THINKING_START`, `THINKING_END`, `THINKING_TEXT_MESSAGE_START`, `THINKING_TEXT_MESSAGE_CONTENT`, `THINKING_TEXT_MESSAGE_END` | Extended thinking blocks (Claude 4+ models) | +| **Reasoning** | `REASONING_START`, `REASONING_END`, `REASONING_MESSAGE_START`, `REASONING_MESSAGE_CONTENT`, `REASONING_MESSAGE_END`, `REASONING_MESSAGE_CHUNK`, `REASONING_ENCRYPTED_VALUE` | Reasoning trace (Gemini 2.5+ Deep Research) | +| **State** | `STATE_SNAPSHOT`, `STATE_DELTA`, `MESSAGES_SNAPSHOT`, `ACTIVITY_SNAPSHOT`, `ACTIVITY_DELTA` | Bidirectional state sync (LangGraph pattern) | +| **Custom** | `RAW`, `CUSTOM` | Framework-specific or debug events | + +Each event carries: +- `type` — event type from the enum above +- `run_id` — AG-UI run identifier (scoped to a single execution turn) +- `thread_id` — session identifier (maps to `session_id` in DB) +- Payload fields specific to the event type (e.g., `message_id`, `tool_id`, `content`, `args`) + +**Start/End Pairing:** Events with `_START` / `_END` suffixes define stream boundaries. Content events (`_CONTENT`, `_ARGS`, `_CHUNK`) appear between their corresponding start/end markers. + +**Example sequence:** +``` +RUN_STARTED +├── TEXT_MESSAGE_START (role=assistant, message_id=msg_abc) +│ ├── TEXT_MESSAGE_CONTENT (content="Let me") +│ ├── TEXT_MESSAGE_CONTENT (content=" check") +│ └── TEXT_MESSAGE_END +├── TOOL_CALL_START (tool_name=Read, tool_call_id=tc_123) +│ ├── TOOL_CALL_ARGS (args='{"file') +│ ├── TOOL_CALL_ARGS (args='_path":') +│ ├── TOOL_CALL_ARGS (args='"/app/file.txt"}') +│ └── TOOL_CALL_END +├── TOOL_CALL_RESULT (tool_call_id=tc_123, result="file contents...") +└── RUN_FINISHED +``` + +#### Event Compression + +AG-UI events stream at **token-level granularity** — a single word or JSON fragment can emit one event. Without compression, sessions generate thousands of tiny rows (e.g., `TEXT_MESSAGE_CONTENT` with `"Let"`, then `" me"`, then `" check"`). This creates storage bloat and query overhead. + +**Compression Strategy — Context-Aware Accumulation:** + +Events are compressed **before persistence** by the runner's gRPC client. Compression groups consecutive events sharing the same **context** (message_id, tool_call_id, role). When the context changes or a boundary event arrives, the accumulated content is flushed as a single compressed event. + +**Compression Rules:** + +| Event Type | Compression Behavior | +|------------|---------------------| +| `TEXT_MESSAGE_START` | **Boundary** — flushes prior accumulated content; starts new message context | +| `TEXT_MESSAGE_CONTENT` | **Accumulate** — append `content` to buffer within current message context | +| `TEXT_MESSAGE_END` | **Boundary** — flushes accumulated content; ends message context | +| `TOOL_CALL_START` | **Boundary** — starts new tool call context | +| `TOOL_CALL_ARGS` | **Accumulate** — append `args` fragment to buffer within current tool context | +| `TOOL_CALL_END` | **Boundary** — flushes accumulated args; ends tool context | +| `THINKING_TEXT_MESSAGE_CONTENT` | **Accumulate** — within thinking message context | +| `REASONING_MESSAGE_CONTENT` | **Accumulate** — within reasoning message context | +| All `_START`, `_END`, `_RESULT`, run/step lifecycle | **Never compressed** — stored as individual events | + +**Context Definition:** +- Text messages: `(message_id, role)` +- Tool calls: `(tool_call_id)` +- Thinking: `(message_id, thinking_id)` +- Reasoning: `(message_id, reasoning_id)` + +**Flush Triggers:** +1. Context change (new message_id / tool_call_id) +2. Boundary event (`_START`, `_END`) +3. Event type transition (TEXT → TOOL, TOOL → TEXT) +4. Buffer size threshold (optional; e.g., 10 KB per compressed event) +5. Time threshold (optional; e.g., 5 seconds idle) + +**Metadata Preservation:** +- `created_at` — timestamp of the **first** event in the compressed group +- `completed_at` — timestamp of the **last** event (new field on `SessionMessage`) +- `event_count` — number of raw events compressed into this row (new field) + +**Example — Before Compression:** +```json +{"seq":10, "event_type":"TEXT_MESSAGE_START", "payload":"{\"message_id\":\"msg_1\",\"role\":\"assistant\"}"} +{"seq":11, "event_type":"TEXT_MESSAGE_CONTENT", "payload":"{\"content\":\"Let\"}"} +{"seq":12, "event_type":"TEXT_MESSAGE_CONTENT", "payload":"{\"content\":\" me\"}"} +{"seq":13, "event_type":"TEXT_MESSAGE_CONTENT", "payload":"{\"content\":\" check\"}"} +{"seq":14, "event_type":"TEXT_MESSAGE_END", "payload":"{}"} +``` + +**After Compression:** +```json +{"seq":10, "event_type":"TEXT_MESSAGE_START", "payload":"{\"message_id\":\"msg_1\",\"role\":\"assistant\"}"} +{"seq":11, "event_type":"TEXT_MESSAGE_CONTENT", "payload":"{\"content\":\"Let me check\"}", "event_count":3, "completed_at":"2026-05-21T..."} +{"seq":12, "event_type":"TEXT_MESSAGE_END", "payload":"{}"} +``` + +**Space Savings:** Typical compression ratios range from **5:1** (simple text) to **20:1** (complex tool arguments with many JSON fragments). + +**Backward Compatibility:** Existing queries and APIs continue to work. Compressed events are decompressed on read if clients require token-level replay (future enhancement). + +#### Storage Model + +Compressed events are stored in the existing `session_messages` table with schema extensions: + +```sql +CREATE TABLE session_messages ( + id VARCHAR(36) PRIMARY KEY, + session_id VARCHAR(36) NOT NULL REFERENCES sessions(id), + seq BIGINT NOT NULL, + event_type VARCHAR(255) NOT NULL, + payload TEXT NOT NULL, + created_at TIMESTAMPTZ NOT NULL, + completed_at TIMESTAMPTZ, -- NEW: timestamp of last event in compressed group + event_count INT DEFAULT 1, -- NEW: number of raw events compressed (1 = uncompressed) + UNIQUE(session_id, seq) +); + +CREATE INDEX idx_session_messages_session_id ON session_messages(session_id); +CREATE INDEX idx_session_messages_event_type ON session_messages(event_type); +``` + +**Field Semantics:** + +| Field | Description | +|-------|-------------| +| `seq` | Monotonic sequence within session; gaps allowed after compression | +| `event_type` | AG-UI event type enum (see table above) | +| `payload` | JSON-encoded event payload; structure varies by event type | +| `created_at` | First event timestamp (for compressed events) or single event timestamp | +| `completed_at` | Last event timestamp for compressed events; `NULL` for uncompressed | +| `event_count` | Number of raw events compressed; `1` = uncompressed, `>1` = compressed | + +**Backward Compatibility:** Existing rows have `completed_at=NULL` and `event_count=1`. Migration backfills default values without data loss. + +#### API Endpoints + +``` +GET /api/ambient/v1/sessions/{id}/messages # List compressed events (paginated) +POST /api/ambient/v1/sessions/{id}/messages # Push user message (HTTP; validated as event_type=user) +GET /api/ambient/v1/sessions/{id}/events # SSE proxy to runner pod (live, ephemeral) +``` + +**Query Parameters (GET /messages):** + +| Param | Type | Description | +|-------|------|-------------| +| `after_seq` | int64 | Return events with `seq > after_seq` (for replay/catch-up) | +| `event_type` | string | Filter by event type (e.g., `TOOL_CALL_START`) | +| `limit` | int | Max events to return (default 100, max 1000) | + +**Response (GET /messages):** +```json +{ + "items": [ + { + "id": "01HXY...", + "session_id": "2abc...", + "seq": 42, + "event_type": "TEXT_MESSAGE_CONTENT", + "payload": "{\"content\":\"Let me check the file\"}", + "created_at": "2026-05-21T10:00:00Z", + "completed_at": "2026-05-21T10:00:02Z", + "event_count": 8 + } + ], + "page": 1, + "size": 100, + "total": 523 +} +``` + +#### gRPC Protocol + +Runners push events via `PushSessionMessage`: + +```protobuf +message PushSessionMessageRequest { + string session_id = 1; + string event_type = 2; // AG-UI event type + string payload = 3; // JSON-encoded event payload +} + +message SessionMessage { + string id = 1; + string session_id = 2; + int64 seq = 3; + string event_type = 4; + string payload = 5; + google.protobuf.Timestamp created_at = 6; + google.protobuf.Timestamp completed_at = 7; // NEW + int32 event_count = 8; // NEW +} +``` + +**Compression in gRPC Client:** + +The runner's gRPC client (`ambient-runner` Python package) implements compression **before** calling `PushSessionMessage`. The compressor maintains: +- **Context stack** — tracks active message_id, tool_call_id, thinking_id, reasoning_id +- **Accumulation buffer** — collects content/args fragments for current context +- **Flush logic** — detects boundary events and context transitions + +When a flush occurs, the compressor: +1. Concatenates accumulated fragments into a single payload +2. Attaches `event_count` and `completed_at` metadata +3. Calls `PushSessionMessage` once with the compressed event +4. Resets the accumulation buffer + +**Implementation Note:** Compression is **opt-in per runner framework**. Legacy runners continue to push uncompressed events (stored with `event_count=1`). The API server and database accept both formats transparently. + +#### Event Type Mapping (Legacy Compatibility) + +The current `event_type` field in `session_messages` stores simplified event types (`user`, `assistant`, `tool_use`, `tool_result`, `system`, `error`). These are **legacy shims** from the pre-AG-UI era. New implementations use the full AG-UI event type vocabulary. + +**Migration Path:** +1. Extend `event_type` column to accept AG-UI event types (`VARCHAR(255)` already sufficient) +2. Runners emit AG-UI event types verbatim +3. API backward compatibility: map AG-UI types to legacy types for clients expecting old schema + +**Mapping Table:** + +| AG-UI Event Type | Legacy `event_type` | +|------------------|---------------------| +| `TEXT_MESSAGE_START` (role=user) | `user` | +| `TEXT_MESSAGE_START` (role=assistant) | `assistant` | +| `TOOL_CALL_START` | `tool_use` | +| `TOOL_CALL_RESULT` | `tool_result` | +| `RUN_ERROR` | `error` | +| All others | Store verbatim; no legacy mapping | + +**Implementation Status:** ✅ Runners already emit AG-UI event types. API server accepts and stores them verbatim. Legacy mapping is provided for backward compatibility in read queries only (future optional enhancement). + --- ## ScheduledSession — Recurring Agent Trigger @@ -1243,6 +1479,8 @@ _Last updated: 2026-04-28. Use this as the authoritative index — click into co | **Sessions — start/stop** | ✅ `/start` `/stop` | ✅ `SessionAPI.{Start,Stop}` | ✅ `start`/`stop` commands | | | **Sessions — messages (list/push/watch)** | ✅ `/messages` | ✅ `PushMessage`, `ListMessages`, `WatchSessionMessages` (gRPC) | ✅ `session messages`, `session send` | gRPC watch via `session_watch.go` | | **Sessions — live events (SSE proxy)** | ✅ `/events` → runner pod | ✅ `SessionAPI.StreamEvents` → `io.ReadCloser` | ✅ `session events` | Runner must be Running; 502 if unreachable | +| **Events API — compression** | 🔲 runner gRPC client compression | 🔲 `completed_at`, `event_count` fields | 🔲 migration pending | Compression is opt-in; legacy uncompressed events supported | +| **Events API — AG-UI event types** | ✅ runners emit AG-UI types | ✅ stored verbatim in `event_type` | ✅ query support | Legacy mapping available for backward compat | | **Sessions — labels/annotations** | ✅ PATCH accepts `labels`/`annotations` | ✅ fields on `Session` type; `SessionAPI.Update(patch map[string]any)` | ⚠️ no dedicated subcommand; use `acpctl get session -o json` + manual PATCH | | | **Sessions — workspace files** | ✅ sessions plugin; stubs empty list when no runner; 503 per-file-op | 🔲 | 🔲 `session workspace list/get/put/delete` | Requires running session for file ops | | **Sessions — pre-upload files** | ✅ sessions plugin; stubs empty list when no runner; 503 per-file-op | 🔲 | 🔲 `session files list/upload/delete` | S3-staged; available before session starts | From ed11226010404965e85666b07257abfdd1de85b4 Mon Sep 17 00:00:00 2001 From: Mark Turansky Date: Thu, 21 May 2026 00:23:07 +0000 Subject: [PATCH 2/3] Separate Events API from Messages API with dedicated table MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit BREAKING CHANGE: Events API now uses separate `session_events` table ## What Changed ### Architectural Separation **Messages API** (`session_messages` table): - Purpose: Human-readable conversation summary - Granularity: Message-level (prompts, replies, tool summaries) - Audience: End users, conversation history UIs - Event Types: 6 simplified (user, assistant, tool_use, tool_result, system, error) - Volume: ~10-100 messages per session - NO compression needed **Events API** (`session_events` table - NEW): - Purpose: Complete AG-UI event audit trail - Granularity: Token-level (every delta, every event) - Audience: Developers, debugging, analytics, compliance - Event Types: 33 AG-UI types (TEXT_MESSAGE_START, TOOL_CALL_ARGS, etc.) - Volume: ~1,000-20,000 events per session - Context-aware compression: 5:1 to 20:1 ratios ### Three Event Streams 1. `GET /sessions/{id}/messages` - Messages API (human conversation) 2. `GET /sessions/{id}/events` - Live SSE stream (ephemeral, active sessions) 3. `GET /sessions/{id}/events/history` - Events API (persisted compressed events) ### New Schema ```sql CREATE TABLE session_events ( id VARCHAR(36) PRIMARY KEY, session_id VARCHAR(36) NOT NULL REFERENCES sessions(id), seq BIGINT NOT NULL, event_type VARCHAR(255) NOT NULL, payload TEXT NOT NULL, created_at TIMESTAMPTZ NOT NULL, completed_at TIMESTAMPTZ, event_count INT DEFAULT 1, UNIQUE(session_id, seq) ); ``` ### Dual Push Pattern Runners emit BOTH: - `PushSessionMessage` (gRPC) → high-level conversation turns - `PushSessionEvent` (gRPC) → compressed AG-UI events ### ERD Updates - Added SessionEvent entity - Kept SessionMessage unchanged for Messages API - Added Session → SessionEvent relationship ## Rationale Messages are for humans to read the conversation. Events are for machines to replay, debug, analyze, and audit. Mixing them in one table creates: - Storage bloat (thousands of tiny token deltas) - Query confusion (are we fetching conversation or audit trail?) - Compression complexity (what to compress vs preserve) Separation provides: - Clear architectural boundaries - Optimized storage per use case - Independent evolution paths Co-Authored-By: Claude Sonnet 4.5 --- specs/api/ambient-model.spec.md | 224 +++++++++++++++++++++++--------- 1 file changed, 160 insertions(+), 64 deletions(-) diff --git a/specs/api/ambient-model.spec.md b/specs/api/ambient-model.spec.md index 1eb79b492..e02eb78ff 100644 --- a/specs/api/ambient-model.spec.md +++ b/specs/api/ambient-model.spec.md @@ -145,12 +145,23 @@ erDiagram time deleted_at } - %% ── SessionMessage (AG-UI event stream — real LLM turns) ───────────────── + %% ── SessionMessage (high-level conversation — human-readable) ──────────── SessionMessage { string ID PK string session_id FK int seq "monotonic within session" + string event_type "user | assistant | tool_use | tool_result | system | error" + string payload "message body or JSON-encoded event" + time created_at + } + + %% ── SessionEvent (comprehensive AG-UI event stream) ─────────────────────── + + SessionEvent { + string ID PK + string session_id FK + int64 seq "monotonic within session; gaps allowed after compression" string event_type "AG-UI event type (33 types: TEXT_MESSAGE_START, TOOL_CALL_START, etc.)" string payload "JSON-encoded event payload" time created_at @@ -247,6 +258,7 @@ erDiagram Inbox }o--o| Agent : "sent_from" Session ||--o{ SessionMessage : "streams" + Session ||--o{ SessionEvent : "emits" Role ||--o{ RoleBinding : "granted_by" ``` @@ -322,26 +334,81 @@ All four are assembled into the start context in that order. Pokes roll downhill --- -## SessionMessage — AG-UI Event Stream +## SessionMessage — High-Level Conversation (Messages API) -SessionMessages are the real LLM conversation. They are appended by the runner via gRPC `PushSessionMessage` and streamed to clients via SSE. +SessionMessages provide a **concise, human-readable** view of the conversation. This is the Messages API — prompts, replies, and high-level tool invocations summarized for human consumption. -`seq` is monotonically increasing within a session. `event_type` follows the AG-UI protocol: `user`, `assistant`, `tool_use`, `tool_result`, `system`, `error`. +`seq` is monotonically increasing within a session. `event_type` follows simplified categories: `user`, `assistant`, `tool_use`, `tool_result`, `system`, `error`. -SessionMessages are never deleted or edited. They are the canonical record of what happened in a session. +SessionMessages are never deleted or edited. They represent the conversation summary — what the user asked, what the agent replied, which tools were used. -### Two Event Streams +**Examples:** +- User message: `"Please review the PR and suggest improvements"` +- Assistant message: `"I'll review the pull request. Let me read the files."` +- Tool use: `Read(file_path="src/main.go")` +- Tool result: Summary of file contents + +**REST API:** +``` +GET /api/ambient/v1/sessions/{id}/messages # List conversation messages (paginated) +POST /api/ambient/v1/sessions/{id}/messages # Push user message +``` + +**gRPC:** +``` +rpc PushSessionMessage(PushSessionMessageRequest) returns (SessionMessage) +rpc WatchSessionMessages(WatchSessionMessagesRequest) returns (stream SessionMessage) +``` + +--- + +## SessionEvent — Comprehensive Event Stream (Events API) + +SessionEvents provide the **complete, granular** AG-UI event stream emitted during session execution. This is the Events API — every tool call, every thinking token, every content delta, every state transition. + +`seq` is monotonically increasing within a session (gaps allowed after compression). `event_type` follows the full AG-UI protocol with 33 event types. + +SessionEvents are never deleted or edited. They are the canonical **audit trail** of everything that happened during a session — ideal for debugging, replays, analytics, and compliance. + +**Examples:** +- `RUN_STARTED` — session execution began +- `TEXT_MESSAGE_START` (role=assistant, message_id=msg_abc) — assistant started a message +- `TEXT_MESSAGE_CONTENT` (content="Let me check") — assistant emitted text (compressed from many deltas) +- `TOOL_CALL_START` (tool_name=Read, tool_call_id=tc_123) — tool invocation started +- `TOOL_CALL_ARGS` (args='{"file_path":"/app/main.go"}') — tool arguments (compressed from fragments) +- `TOOL_CALL_END` — tool invocation complete +- `TOOL_CALL_RESULT` (result="package main...") — tool execution result +- `THINKING_TEXT_MESSAGE_CONTENT` — extended thinking content (Claude 4+) +- `REASONING_MESSAGE_CONTENT` — reasoning trace (Gemini Deep Research) +- `RUN_FINISHED` — session execution completed + +### Messages API vs Events API + +| Aspect | Messages API (`session_messages`) | Events API (`session_events`) | +|--------|-----------------------------------|-------------------------------| +| **Purpose** | Human-readable conversation summary | Complete AG-UI event audit trail | +| **Granularity** | Message-level (prompts, replies, tool summaries) | Token-level (every delta, every event) | +| **Audience** | End users, conversation history UIs | Developers, debugging, analytics, compliance | +| **Event Types** | 6 simplified types (user, assistant, tool_use, etc.) | 33 AG-UI event types (TEXT_MESSAGE_START, TOOL_CALL_ARGS, etc.) | +| **Volume** | ~10-100 messages per session | ~1,000-20,000 events per session (compressed) | +| **Compression** | No compression needed | Context-aware compression (5:1 to 20:1) | +| **Streaming** | gRPC watch + replay from DB | SSE proxy to runner pod (ephemeral) + persisted compressed events | + +### Three Event Streams | Endpoint | Source | Persistence | Purpose | |---|---|---|---| -| `GET /sessions/{id}/messages` | API server gRPC fan-out | Persisted in DB (replay from `seq=0`) | Durable stream; supports replay and history | -| `GET /sessions/{id}/events` | Runner pod SSE (`GET /events/{thread_id}`) | Ephemeral; runner-local in-memory queue | Live AG-UI turn events during an active run | +| `GET /sessions/{id}/messages` | gRPC `PushSessionMessage` | `session_messages` table | **Messages API** — human-readable conversation | +| `GET /sessions/{id}/events` | Runner pod SSE (`/events/{thread_id}`) | Ephemeral in-memory queue | **Live Events** — real-time AG-UI events during active run | +| `GET /sessions/{id}/events/history` | gRPC `PushSessionEvent` | `session_events` table | **Events API** — complete persisted event audit trail | -The runner's `/events/{thread_id}` endpoint registers an asyncio queue into `bridge._active_streams[thread_id]` and streams every AG-UI event as SSE until `RUN_FINISHED` / `RUN_ERROR` or client disconnect. The API server's `/sessions/{id}/events` proxies this from the runner pod for the active session, routing via pod IP or session service. Keepalive pings fire every 30s to hold the connection open. +The runner's `/events/{thread_id}` endpoint streams live AG-UI events via SSE during an active run. The API server proxies this from the runner pod (`GET /sessions/{id}/events`). These are **ephemeral** — disappear when the session ends. -### Events API — AG-UI Event Protocol +Simultaneously, the runner's gRPC client pushes **compressed events** to `session_events` table for durable storage. These power the **Events API** (`GET /sessions/{id}/events/history`) for post-session replay, debugging, and analysis. -The Events API provides structured access to the AG-UI event stream emitted by runner pods during session execution. Events are the atomic units of a conversation: text deltas, tool calls, thinking blocks, state updates, and control flow markers. +### Events API — Storage and Compression + +The Events API stores the complete AG-UI event stream in the `session_events` table. Events are the atomic units of session execution: text deltas, tool calls, thinking blocks, state updates, and control flow markers. #### AG-UI Event Types @@ -444,23 +511,24 @@ Events are compressed **before persistence** by the runner's gRPC client. Compre #### Storage Model -Compressed events are stored in the existing `session_messages` table with schema extensions: +Compressed events are stored in the `session_events` table: ```sql -CREATE TABLE session_messages ( - id VARCHAR(36) PRIMARY KEY, - session_id VARCHAR(36) NOT NULL REFERENCES sessions(id), - seq BIGINT NOT NULL, - event_type VARCHAR(255) NOT NULL, - payload TEXT NOT NULL, - created_at TIMESTAMPTZ NOT NULL, - completed_at TIMESTAMPTZ, -- NEW: timestamp of last event in compressed group - event_count INT DEFAULT 1, -- NEW: number of raw events compressed (1 = uncompressed) +CREATE TABLE session_events ( + id VARCHAR(36) PRIMARY KEY, + session_id VARCHAR(36) NOT NULL REFERENCES sessions(id), + seq BIGINT NOT NULL, + event_type VARCHAR(255) NOT NULL, + payload TEXT NOT NULL, + created_at TIMESTAMPTZ NOT NULL, + completed_at TIMESTAMPTZ, -- timestamp of last event in compressed group (NULL for uncompressed) + event_count INT DEFAULT 1, -- number of raw events compressed (1 = uncompressed, >1 = compressed) UNIQUE(session_id, seq) ); -CREATE INDEX idx_session_messages_session_id ON session_messages(session_id); -CREATE INDEX idx_session_messages_event_type ON session_messages(event_type); +CREATE INDEX idx_session_events_session_id ON session_events(session_id); +CREATE INDEX idx_session_events_event_type ON session_events(event_type); +CREATE INDEX idx_session_events_created_at ON session_events(created_at); ``` **Field Semantics:** @@ -468,31 +536,37 @@ CREATE INDEX idx_session_messages_event_type ON session_messages(event_type); | Field | Description | |-------|-------------| | `seq` | Monotonic sequence within session; gaps allowed after compression | -| `event_type` | AG-UI event type enum (see table above) | +| `event_type` | AG-UI event type enum (33 types: RUN_STARTED, TEXT_MESSAGE_START, TOOL_CALL_ARGS, etc.) | | `payload` | JSON-encoded event payload; structure varies by event type | | `created_at` | First event timestamp (for compressed events) or single event timestamp | | `completed_at` | Last event timestamp for compressed events; `NULL` for uncompressed | | `event_count` | Number of raw events compressed; `1` = uncompressed, `>1` = compressed | -**Backward Compatibility:** Existing rows have `completed_at=NULL` and `event_count=1`. Migration backfills default values without data loss. - #### API Endpoints +**Messages API** (human-readable conversation): ``` -GET /api/ambient/v1/sessions/{id}/messages # List compressed events (paginated) +GET /api/ambient/v1/sessions/{id}/messages # List conversation messages (paginated) POST /api/ambient/v1/sessions/{id}/messages # Push user message (HTTP; validated as event_type=user) -GET /api/ambient/v1/sessions/{id}/events # SSE proxy to runner pod (live, ephemeral) ``` -**Query Parameters (GET /messages):** +**Events API** (comprehensive AG-UI event stream): +``` +GET /api/ambient/v1/sessions/{id}/events # SSE proxy to runner pod (live, ephemeral, active sessions only) +GET /api/ambient/v1/sessions/{id}/events/history # List persisted compressed events (paginated) +``` + +**Query Parameters (GET /events/history):** | Param | Type | Description | |-------|------|-------------| | `after_seq` | int64 | Return events with `seq > after_seq` (for replay/catch-up) | -| `event_type` | string | Filter by event type (e.g., `TOOL_CALL_START`) | +| `event_type` | string | Filter by AG-UI event type (e.g., `TOOL_CALL_START`, `TEXT_MESSAGE_CONTENT`) | | `limit` | int | Max events to return (default 100, max 1000) | +| `start_time` | ISO8601 | Filter events created after this timestamp | +| `end_time` | ISO8601 | Filter events created before this timestamp | -**Response (GET /messages):** +**Response (GET /events/history):** ```json { "items": [ @@ -505,23 +579,35 @@ GET /api/ambient/v1/sessions/{id}/events # SSE proxy to runn "created_at": "2026-05-21T10:00:00Z", "completed_at": "2026-05-21T10:00:02Z", "event_count": 8 + }, + { + "id": "01HXZ...", + "session_id": "2abc...", + "seq": 43, + "event_type": "TOOL_CALL_START", + "payload": "{\"tool_name\":\"Read\",\"tool_call_id\":\"tc_123\"}", + "created_at": "2026-05-21T10:00:02Z", + "completed_at": null, + "event_count": 1 } ], "page": 1, "size": 100, - "total": 523 + "total": 15234 } ``` #### gRPC Protocol -Runners push events via `PushSessionMessage`: - +**Messages API** (concise conversation): ```protobuf +// Push a human-readable message to the conversation +rpc PushSessionMessage(PushSessionMessageRequest) returns (SessionMessage) + message PushSessionMessageRequest { string session_id = 1; - string event_type = 2; // AG-UI event type - string payload = 3; // JSON-encoded event payload + string event_type = 2; // Simplified: user | assistant | tool_use | tool_result | system | error + string payload = 3; // Message body or summary } message SessionMessage { @@ -531,14 +617,37 @@ message SessionMessage { string event_type = 4; string payload = 5; google.protobuf.Timestamp created_at = 6; - google.protobuf.Timestamp completed_at = 7; // NEW - int32 event_count = 8; // NEW +} +``` + +**Events API** (comprehensive AG-UI stream): +```protobuf +// Push a compressed AG-UI event to the audit trail +rpc PushSessionEvent(PushSessionEventRequest) returns (SessionEvent) + +message PushSessionEventRequest { + string session_id = 1; + string event_type = 2; // AG-UI event type (33 types) + string payload = 3; // JSON-encoded event payload + optional google.protobuf.Timestamp completed_at = 4; // Last event timestamp (for compressed) + optional int32 event_count = 5; // Number of events compressed (default 1) +} + +message SessionEvent { + string id = 1; + string session_id = 2; + int64 seq = 3; + string event_type = 4; + string payload = 5; + google.protobuf.Timestamp created_at = 6; + optional google.protobuf.Timestamp completed_at = 7; + int32 event_count = 8; } ``` **Compression in gRPC Client:** -The runner's gRPC client (`ambient-runner` Python package) implements compression **before** calling `PushSessionMessage`. The compressor maintains: +The runner's gRPC client (`ambient-runner` Python package) implements compression **before** calling `PushSessionEvent`. The compressor maintains: - **Context stack** — tracks active message_id, tool_call_id, thinking_id, reasoning_id - **Accumulation buffer** — collects content/args fragments for current context - **Flush logic** — detects boundary events and context transitions @@ -546,32 +655,18 @@ The runner's gRPC client (`ambient-runner` Python package) implements compressio When a flush occurs, the compressor: 1. Concatenates accumulated fragments into a single payload 2. Attaches `event_count` and `completed_at` metadata -3. Calls `PushSessionMessage` once with the compressed event +3. Calls `PushSessionEvent` once with the compressed event 4. Resets the accumulation buffer -**Implementation Note:** Compression is **opt-in per runner framework**. Legacy runners continue to push uncompressed events (stored with `event_count=1`). The API server and database accept both formats transparently. - -#### Event Type Mapping (Legacy Compatibility) - -The current `event_type` field in `session_messages` stores simplified event types (`user`, `assistant`, `tool_use`, `tool_result`, `system`, `error`). These are **legacy shims** from the pre-AG-UI era. New implementations use the full AG-UI event type vocabulary. - -**Migration Path:** -1. Extend `event_type` column to accept AG-UI event types (`VARCHAR(255)` already sufficient) -2. Runners emit AG-UI event types verbatim -3. API backward compatibility: map AG-UI types to legacy types for clients expecting old schema +**Dual Push Pattern:** -**Mapping Table:** +Runners emit **both** messages and events: +- `PushSessionMessage` — high-level conversation turns (user prompts, assistant replies, tool summaries) +- `PushSessionEvent` — every AG-UI event (text deltas, tool args, thinking tokens, all compressed) -| AG-UI Event Type | Legacy `event_type` | -|------------------|---------------------| -| `TEXT_MESSAGE_START` (role=user) | `user` | -| `TEXT_MESSAGE_START` (role=assistant) | `assistant` | -| `TOOL_CALL_START` | `tool_use` | -| `TOOL_CALL_RESULT` | `tool_result` | -| `RUN_ERROR` | `error` | -| All others | Store verbatim; no legacy mapping | +This provides both human-readable conversation history and complete audit trail. -**Implementation Status:** ✅ Runners already emit AG-UI event types. API server accepts and stores them verbatim. Legacy mapping is provided for backward compatibility in read queries only (future optional enhancement). +**Implementation Note:** Compression is **opt-in per runner framework**. Legacy runners can push uncompressed events (stored with `event_count=1`). The API server and database accept both formats transparently. --- @@ -1477,10 +1572,11 @@ _Last updated: 2026-04-28. Use this as the authoritative index — click into co |---|---|---|---|---| | **Sessions — CRUD** | ✅ | ✅ `SessionAPI.{Get,List,Create,Update,Delete}` | ✅ `get/create/delete session` | | | **Sessions — start/stop** | ✅ `/start` `/stop` | ✅ `SessionAPI.{Start,Stop}` | ✅ `start`/`stop` commands | | -| **Sessions — messages (list/push/watch)** | ✅ `/messages` | ✅ `PushMessage`, `ListMessages`, `WatchSessionMessages` (gRPC) | ✅ `session messages`, `session send` | gRPC watch via `session_watch.go` | -| **Sessions — live events (SSE proxy)** | ✅ `/events` → runner pod | ✅ `SessionAPI.StreamEvents` → `io.ReadCloser` | ✅ `session events` | Runner must be Running; 502 if unreachable | -| **Events API — compression** | 🔲 runner gRPC client compression | 🔲 `completed_at`, `event_count` fields | 🔲 migration pending | Compression is opt-in; legacy uncompressed events supported | -| **Events API — AG-UI event types** | ✅ runners emit AG-UI types | ✅ stored verbatim in `event_type` | ✅ query support | Legacy mapping available for backward compat | +| **Messages API — list/push/watch** | ✅ `/messages` | ✅ `PushMessage`, `ListMessages`, `WatchSessionMessages` (gRPC) | ✅ `session messages`, `session send` | Human-readable conversation in `session_messages` table | +| **Events API — live SSE stream** | ✅ `/events` → runner pod SSE | ✅ `SessionAPI.StreamEvents` → `io.ReadCloser` | ✅ `session events` | Ephemeral; runner must be Running; 502 if unreachable | +| **Events API — persisted history** | 🔲 `/events/history` | 🔲 `ListSessionEvents`, `PushSessionEvent` (gRPC) | 🔲 CLI not yet implemented | New `session_events` table with compression | +| **Events API — compression** | 🔲 runner gRPC client compressor | 🔲 `completed_at`, `event_count` fields in `SessionEvent` | 🔲 migration pending | Context-aware accumulation; 5:1 to 20:1 compression | +| **Events API — 33 AG-UI event types** | ✅ runners emit AG-UI types | 🔲 stored in `session_events.event_type` | 🔲 query support pending | TEXT_MESSAGE_START, TOOL_CALL_ARGS, THINKING_*, REASONING_*, etc. | | **Sessions — labels/annotations** | ✅ PATCH accepts `labels`/`annotations` | ✅ fields on `Session` type; `SessionAPI.Update(patch map[string]any)` | ⚠️ no dedicated subcommand; use `acpctl get session -o json` + manual PATCH | | | **Sessions — workspace files** | ✅ sessions plugin; stubs empty list when no runner; 503 per-file-op | 🔲 | 🔲 `session workspace list/get/put/delete` | Requires running session for file ops | | **Sessions — pre-upload files** | ✅ sessions plugin; stubs empty list when no runner; 503 per-file-op | 🔲 | 🔲 `session files list/upload/delete` | S3-staged; available before session starts | From b47a9521c16b346c16c588a4c3d56ab10da5984c Mon Sep 17 00:00:00 2001 From: Mark Turansky Date: Thu, 21 May 2026 02:22:11 +0000 Subject: [PATCH 3/3] Address PR review feedback: fix critical issues MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fixes based on markturansky and CodeRabbit review feedback. ## Critical Fixes 1. **Sequence numbering consistency** (markturansky critical issue): - Fixed compression example to show gaps (seq 11 → 14, not 11 → 12) - Added note explaining gaps preserve idempotence and prevent race conditions - Aligns with schema comment "gaps allowed after compression" 2. **Missing migration section** (markturansky major issue): - Added "Migration from Current State" subsection to spec - Documents CREATE TABLE for session_events - Notes no schema changes needed for session_messages - Clarifies backward compatibility and opt-in compression ## CodeRabbit Fixes 3. **Event type conflict clarification**: - Explicitly documented 6 SessionMessage event types (legacy) - Added clear note these are distinct from AG-UI event types - Prevents confusion between Messages API and Events API types 4. **Event naming clarity**: - Added note explaining THINKING_TEXT_MESSAGE_CONTENT vs TEXT_MESSAGE_CONTENT - Clarifies prefixes indicate semantic context (thinking/reasoning blocks) 5. **gRPC proto field documentation**: - Compression metadata fields already present in PushSessionEventRequest - Minor comment clarification for completed_at field All changes maintain backward compatibility and align spec with implementation path. Co-Authored-By: Claude Sonnet 4.5 --- specs/api/ambient-model.spec.md | 60 +++++++++++++++++++++++++++++---- 1 file changed, 53 insertions(+), 7 deletions(-) diff --git a/specs/api/ambient-model.spec.md b/specs/api/ambient-model.spec.md index e02eb78ff..373446e18 100644 --- a/specs/api/ambient-model.spec.md +++ b/specs/api/ambient-model.spec.md @@ -338,7 +338,17 @@ All four are assembled into the start context in that order. Pokes roll downhill SessionMessages provide a **concise, human-readable** view of the conversation. This is the Messages API — prompts, replies, and high-level tool invocations summarized for human consumption. -`seq` is monotonically increasing within a session. `event_type` follows simplified categories: `user`, `assistant`, `tool_use`, `tool_result`, `system`, `error`. +`seq` is monotonically increasing within a session. `event_type` uses **simplified legacy types** (distinct from AG-UI event types used in SessionEvent): + +**Messages API Event Types** (6 types): +- `user` — User prompt or message +- `assistant` — Agent reply or response +- `tool_use` — Tool invocation summary +- `tool_result` — Tool execution result summary +- `system` — System notification or status +- `error` — Error condition + +These are **not** AG-UI event types. For the complete AG-UI protocol with 33 granular event types, see SessionEvent below. SessionMessages are never deleted or edited. They represent the conversation summary — what the user asked, what the agent replied, which tools were used. @@ -431,6 +441,8 @@ Each event carries: - `thread_id` — session identifier (maps to `session_id` in DB) - Payload fields specific to the event type (e.g., `message_id`, `tool_id`, `content`, `args`) +**Note on Event Naming:** Thinking and Reasoning events are prefixed variants of base text message types. For example, `THINKING_TEXT_MESSAGE_CONTENT` is a distinct event type from `TEXT_MESSAGE_CONTENT`, emitted during extended thinking blocks. The prefixes indicate the semantic context (regular message vs thinking vs reasoning). + **Start/End Pairing:** Events with `_START` / `_END` suffixes define stream boundaries. Content events (`_CONTENT`, `_ARGS`, `_CHUNK`) appear between their corresponding start/end markers. **Example sequence:** @@ -498,16 +510,18 @@ Events are compressed **before persistence** by the runner's gRPC client. Compre {"seq":14, "event_type":"TEXT_MESSAGE_END", "payload":"{}"} ``` -**After Compression:** +**After Compression (with gaps):** ```json {"seq":10, "event_type":"TEXT_MESSAGE_START", "payload":"{\"message_id\":\"msg_1\",\"role\":\"assistant\"}"} {"seq":11, "event_type":"TEXT_MESSAGE_CONTENT", "payload":"{\"content\":\"Let me check\"}", "event_count":3, "completed_at":"2026-05-21T..."} -{"seq":12, "event_type":"TEXT_MESSAGE_END", "payload":"{}"} +{"seq":14, "event_type":"TEXT_MESSAGE_END", "payload":"{}"} ``` +**Note:** Sequence numbers preserve gaps after compression (11 → 14) to avoid renumbering all subsequent events. This makes compression idempotent and prevents race conditions with concurrent event streams. + **Space Savings:** Typical compression ratios range from **5:1** (simple text) to **20:1** (complex tool arguments with many JSON fragments). -**Backward Compatibility:** Existing queries and APIs continue to work. Compressed events are decompressed on read if clients require token-level replay (future enhancement). +**Backward Compatibility:** Existing queries and APIs continue to work. Compression is transparent to readers — gaps in `seq` indicate compressed ranges. #### Storage Model @@ -531,6 +545,38 @@ CREATE INDEX idx_session_events_event_type ON session_events(event_type); CREATE INDEX idx_session_events_created_at ON session_events(created_at); ``` +#### Migration from Current State + +**Database Schema Changes** (API server): + +1. Create `session_events` table with compression fields: + ```sql + -- New table creation (no existing data to migrate) + CREATE TABLE session_events ( + id VARCHAR(36) PRIMARY KEY, + session_id VARCHAR(36) NOT NULL REFERENCES sessions(id), + seq BIGINT NOT NULL, + event_type VARCHAR(255) NOT NULL, + payload TEXT NOT NULL, + created_at TIMESTAMPTZ NOT NULL, + completed_at TIMESTAMPTZ, + event_count INT DEFAULT 1, + UNIQUE(session_id, seq) + ); + + CREATE INDEX idx_session_events_session_id ON session_events(session_id); + CREATE INDEX idx_session_events_event_type ON session_events(event_type); + CREATE INDEX idx_session_events_created_at ON session_events(created_at); + ``` + +2. No schema changes required for `session_messages` table (Messages API unchanged). + +**Backward Compatibility:** +- Compression is opt-in at the runner gRPC client level +- Legacy runners can continue pushing uncompressed events indefinitely (`event_count=1`, `completed_at=NULL`) +- API server accepts both compressed and uncompressed events transparently +- Existing `session_messages` table and Messages API remain unchanged + **Field Semantics:** | Field | Description | @@ -627,9 +673,9 @@ rpc PushSessionEvent(PushSessionEventRequest) returns (SessionEvent) message PushSessionEventRequest { string session_id = 1; - string event_type = 2; // AG-UI event type (33 types) - string payload = 3; // JSON-encoded event payload - optional google.protobuf.Timestamp completed_at = 4; // Last event timestamp (for compressed) + string event_type = 2; // AG-UI event type (33 types) + string payload = 3; // JSON-encoded event payload + optional google.protobuf.Timestamp completed_at = 4; // Last event timestamp (for compressed events) optional int32 event_count = 5; // Number of events compressed (default 1) }