diff --git a/specs/api/ambient-model.spec.md b/specs/api/ambient-model.spec.md index eee4c7250..373446e18 100644 --- a/specs/api/ambient-model.spec.md +++ b/specs/api/ambient-model.spec.md @@ -145,7 +145,7 @@ erDiagram time deleted_at } - %% ── SessionMessage (AG-UI event stream — real LLM turns) ───────────────── + %% ── SessionMessage (high-level conversation — human-readable) ──────────── SessionMessage { string ID PK @@ -156,6 +156,19 @@ erDiagram time created_at } + %% ── SessionEvent (comprehensive AG-UI event stream) ─────────────────────── + + SessionEvent { + string ID PK + string session_id FK + int64 seq "monotonic within session; gaps allowed after compression" + string event_type "AG-UI event type (33 types: TEXT_MESSAGE_START, TOOL_CALL_START, etc.)" + string payload "JSON-encoded event payload" + time created_at + time completed_at "nullable — last event timestamp for compressed events" + int32 event_count "number of raw events compressed; 1 = uncompressed" + } + %% ── RBAC ───────────────────────────────────────────────────────────────── Role { @@ -245,6 +258,7 @@ erDiagram Inbox }o--o| Agent : "sent_from" Session ||--o{ SessionMessage : "streams" + Session ||--o{ SessionEvent : "emits" Role ||--o{ RoleBinding : "granted_by" ``` @@ -320,22 +334,385 @@ All four are assembled into the start context in that order. Pokes roll downhill --- -## SessionMessage — AG-UI Event Stream +## SessionMessage — High-Level Conversation (Messages API) + +SessionMessages provide a **concise, human-readable** view of the conversation. This is the Messages API — prompts, replies, and high-level tool invocations summarized for human consumption. + +`seq` is monotonically increasing within a session. `event_type` uses **simplified legacy types** (distinct from AG-UI event types used in SessionEvent): + +**Messages API Event Types** (6 types): +- `user` — User prompt or message +- `assistant` — Agent reply or response +- `tool_use` — Tool invocation summary +- `tool_result` — Tool execution result summary +- `system` — System notification or status +- `error` — Error condition + +These are **not** AG-UI event types. For the complete AG-UI protocol with 33 granular event types, see SessionEvent below. + +SessionMessages are never deleted or edited. They represent the conversation summary — what the user asked, what the agent replied, which tools were used. + +**Examples:** +- User message: `"Please review the PR and suggest improvements"` +- Assistant message: `"I'll review the pull request. Let me read the files."` +- Tool use: `Read(file_path="src/main.go")` +- Tool result: Summary of file contents + +**REST API:** +``` +GET /api/ambient/v1/sessions/{id}/messages # List conversation messages (paginated) +POST /api/ambient/v1/sessions/{id}/messages # Push user message +``` + +**gRPC:** +``` +rpc PushSessionMessage(PushSessionMessageRequest) returns (SessionMessage) +rpc WatchSessionMessages(WatchSessionMessagesRequest) returns (stream SessionMessage) +``` + +--- + +## SessionEvent — Comprehensive Event Stream (Events API) + +SessionEvents provide the **complete, granular** AG-UI event stream emitted during session execution. This is the Events API — every tool call, every thinking token, every content delta, every state transition. + +`seq` is monotonically increasing within a session (gaps allowed after compression). `event_type` follows the full AG-UI protocol with 33 event types. + +SessionEvents are never deleted or edited. They are the canonical **audit trail** of everything that happened during a session — ideal for debugging, replays, analytics, and compliance. -SessionMessages are the real LLM conversation. They are appended by the runner via gRPC `PushSessionMessage` and streamed to clients via SSE. +**Examples:** +- `RUN_STARTED` — session execution began +- `TEXT_MESSAGE_START` (role=assistant, message_id=msg_abc) — assistant started a message +- `TEXT_MESSAGE_CONTENT` (content="Let me check") — assistant emitted text (compressed from many deltas) +- `TOOL_CALL_START` (tool_name=Read, tool_call_id=tc_123) — tool invocation started +- `TOOL_CALL_ARGS` (args='{"file_path":"/app/main.go"}') — tool arguments (compressed from fragments) +- `TOOL_CALL_END` — tool invocation complete +- `TOOL_CALL_RESULT` (result="package main...") — tool execution result +- `THINKING_TEXT_MESSAGE_CONTENT` — extended thinking content (Claude 4+) +- `REASONING_MESSAGE_CONTENT` — reasoning trace (Gemini Deep Research) +- `RUN_FINISHED` — session execution completed -`seq` is monotonically increasing within a session. `event_type` follows the AG-UI protocol: `user`, `assistant`, `tool_use`, `tool_result`, `system`, `error`. +### Messages API vs Events API -SessionMessages are never deleted or edited. They are the canonical record of what happened in a session. +| Aspect | Messages API (`session_messages`) | Events API (`session_events`) | +|--------|-----------------------------------|-------------------------------| +| **Purpose** | Human-readable conversation summary | Complete AG-UI event audit trail | +| **Granularity** | Message-level (prompts, replies, tool summaries) | Token-level (every delta, every event) | +| **Audience** | End users, conversation history UIs | Developers, debugging, analytics, compliance | +| **Event Types** | 6 simplified types (user, assistant, tool_use, etc.) | 33 AG-UI event types (TEXT_MESSAGE_START, TOOL_CALL_ARGS, etc.) | +| **Volume** | ~10-100 messages per session | ~1,000-20,000 events per session (compressed) | +| **Compression** | No compression needed | Context-aware compression (5:1 to 20:1) | +| **Streaming** | gRPC watch + replay from DB | SSE proxy to runner pod (ephemeral) + persisted compressed events | -### Two Event Streams +### Three Event Streams | Endpoint | Source | Persistence | Purpose | |---|---|---|---| -| `GET /sessions/{id}/messages` | API server gRPC fan-out | Persisted in DB (replay from `seq=0`) | Durable stream; supports replay and history | -| `GET /sessions/{id}/events` | Runner pod SSE (`GET /events/{thread_id}`) | Ephemeral; runner-local in-memory queue | Live AG-UI turn events during an active run | +| `GET /sessions/{id}/messages` | gRPC `PushSessionMessage` | `session_messages` table | **Messages API** — human-readable conversation | +| `GET /sessions/{id}/events` | Runner pod SSE (`/events/{thread_id}`) | Ephemeral in-memory queue | **Live Events** — real-time AG-UI events during active run | +| `GET /sessions/{id}/events/history` | gRPC `PushSessionEvent` | `session_events` table | **Events API** — complete persisted event audit trail | + +The runner's `/events/{thread_id}` endpoint streams live AG-UI events via SSE during an active run. The API server proxies this from the runner pod (`GET /sessions/{id}/events`). These are **ephemeral** — disappear when the session ends. + +Simultaneously, the runner's gRPC client pushes **compressed events** to `session_events` table for durable storage. These power the **Events API** (`GET /sessions/{id}/events/history`) for post-session replay, debugging, and analysis. + +### Events API — Storage and Compression + +The Events API stores the complete AG-UI event stream in the `session_events` table. Events are the atomic units of session execution: text deltas, tool calls, thinking blocks, state updates, and control flow markers. + +#### AG-UI Event Types + +Events follow the [AG-UI protocol](https://github.com/anthropics/ag-ui), a streaming protocol for agentic UIs. The protocol defines 33 event types organized into semantic categories: + +| Category | Event Types | Purpose | +|----------|-------------|---------| +| **Run Lifecycle** | `RUN_STARTED`, `RUN_FINISHED`, `RUN_ERROR` | Session execution boundaries | +| **Step Lifecycle** | `STEP_STARTED`, `STEP_FINISHED` | Multi-step execution boundaries (LangGraph pattern) | +| **Text Messages** | `TEXT_MESSAGE_START`, `TEXT_MESSAGE_CONTENT`, `TEXT_MESSAGE_END`, `TEXT_MESSAGE_CHUNK` | User or assistant text content | +| **Tool Calls** | `TOOL_CALL_START`, `TOOL_CALL_ARGS`, `TOOL_CALL_END`, `TOOL_CALL_CHUNK`, `TOOL_CALL_RESULT` | Tool invocations and results | +| **Thinking** | `THINKING_START`, `THINKING_END`, `THINKING_TEXT_MESSAGE_START`, `THINKING_TEXT_MESSAGE_CONTENT`, `THINKING_TEXT_MESSAGE_END` | Extended thinking blocks (Claude 4+ models) | +| **Reasoning** | `REASONING_START`, `REASONING_END`, `REASONING_MESSAGE_START`, `REASONING_MESSAGE_CONTENT`, `REASONING_MESSAGE_END`, `REASONING_MESSAGE_CHUNK`, `REASONING_ENCRYPTED_VALUE` | Reasoning trace (Gemini 2.5+ Deep Research) | +| **State** | `STATE_SNAPSHOT`, `STATE_DELTA`, `MESSAGES_SNAPSHOT`, `ACTIVITY_SNAPSHOT`, `ACTIVITY_DELTA` | Bidirectional state sync (LangGraph pattern) | +| **Custom** | `RAW`, `CUSTOM` | Framework-specific or debug events | + +Each event carries: +- `type` — event type from the enum above +- `run_id` — AG-UI run identifier (scoped to a single execution turn) +- `thread_id` — session identifier (maps to `session_id` in DB) +- Payload fields specific to the event type (e.g., `message_id`, `tool_id`, `content`, `args`) + +**Note on Event Naming:** Thinking and Reasoning events are prefixed variants of base text message types. For example, `THINKING_TEXT_MESSAGE_CONTENT` is a distinct event type from `TEXT_MESSAGE_CONTENT`, emitted during extended thinking blocks. The prefixes indicate the semantic context (regular message vs thinking vs reasoning). + +**Start/End Pairing:** Events with `_START` / `_END` suffixes define stream boundaries. Content events (`_CONTENT`, `_ARGS`, `_CHUNK`) appear between their corresponding start/end markers. + +**Example sequence:** +``` +RUN_STARTED +├── TEXT_MESSAGE_START (role=assistant, message_id=msg_abc) +│ ├── TEXT_MESSAGE_CONTENT (content="Let me") +│ ├── TEXT_MESSAGE_CONTENT (content=" check") +│ └── TEXT_MESSAGE_END +├── TOOL_CALL_START (tool_name=Read, tool_call_id=tc_123) +│ ├── TOOL_CALL_ARGS (args='{"file') +│ ├── TOOL_CALL_ARGS (args='_path":') +│ ├── TOOL_CALL_ARGS (args='"/app/file.txt"}') +│ └── TOOL_CALL_END +├── TOOL_CALL_RESULT (tool_call_id=tc_123, result="file contents...") +└── RUN_FINISHED +``` + +#### Event Compression + +AG-UI events stream at **token-level granularity** — a single word or JSON fragment can emit one event. Without compression, sessions generate thousands of tiny rows (e.g., `TEXT_MESSAGE_CONTENT` with `"Let"`, then `" me"`, then `" check"`). This creates storage bloat and query overhead. + +**Compression Strategy — Context-Aware Accumulation:** + +Events are compressed **before persistence** by the runner's gRPC client. Compression groups consecutive events sharing the same **context** (message_id, tool_call_id, role). When the context changes or a boundary event arrives, the accumulated content is flushed as a single compressed event. + +**Compression Rules:** + +| Event Type | Compression Behavior | +|------------|---------------------| +| `TEXT_MESSAGE_START` | **Boundary** — flushes prior accumulated content; starts new message context | +| `TEXT_MESSAGE_CONTENT` | **Accumulate** — append `content` to buffer within current message context | +| `TEXT_MESSAGE_END` | **Boundary** — flushes accumulated content; ends message context | +| `TOOL_CALL_START` | **Boundary** — starts new tool call context | +| `TOOL_CALL_ARGS` | **Accumulate** — append `args` fragment to buffer within current tool context | +| `TOOL_CALL_END` | **Boundary** — flushes accumulated args; ends tool context | +| `THINKING_TEXT_MESSAGE_CONTENT` | **Accumulate** — within thinking message context | +| `REASONING_MESSAGE_CONTENT` | **Accumulate** — within reasoning message context | +| All `_START`, `_END`, `_RESULT`, run/step lifecycle | **Never compressed** — stored as individual events | + +**Context Definition:** +- Text messages: `(message_id, role)` +- Tool calls: `(tool_call_id)` +- Thinking: `(message_id, thinking_id)` +- Reasoning: `(message_id, reasoning_id)` + +**Flush Triggers:** +1. Context change (new message_id / tool_call_id) +2. Boundary event (`_START`, `_END`) +3. Event type transition (TEXT → TOOL, TOOL → TEXT) +4. Buffer size threshold (optional; e.g., 10 KB per compressed event) +5. Time threshold (optional; e.g., 5 seconds idle) + +**Metadata Preservation:** +- `created_at` — timestamp of the **first** event in the compressed group +- `completed_at` — timestamp of the **last** event (new field on `SessionMessage`) +- `event_count` — number of raw events compressed into this row (new field) + +**Example — Before Compression:** +```json +{"seq":10, "event_type":"TEXT_MESSAGE_START", "payload":"{\"message_id\":\"msg_1\",\"role\":\"assistant\"}"} +{"seq":11, "event_type":"TEXT_MESSAGE_CONTENT", "payload":"{\"content\":\"Let\"}"} +{"seq":12, "event_type":"TEXT_MESSAGE_CONTENT", "payload":"{\"content\":\" me\"}"} +{"seq":13, "event_type":"TEXT_MESSAGE_CONTENT", "payload":"{\"content\":\" check\"}"} +{"seq":14, "event_type":"TEXT_MESSAGE_END", "payload":"{}"} +``` + +**After Compression (with gaps):** +```json +{"seq":10, "event_type":"TEXT_MESSAGE_START", "payload":"{\"message_id\":\"msg_1\",\"role\":\"assistant\"}"} +{"seq":11, "event_type":"TEXT_MESSAGE_CONTENT", "payload":"{\"content\":\"Let me check\"}", "event_count":3, "completed_at":"2026-05-21T..."} +{"seq":14, "event_type":"TEXT_MESSAGE_END", "payload":"{}"} +``` + +**Note:** Sequence numbers preserve gaps after compression (11 → 14) to avoid renumbering all subsequent events. This makes compression idempotent and prevents race conditions with concurrent event streams. + +**Space Savings:** Typical compression ratios range from **5:1** (simple text) to **20:1** (complex tool arguments with many JSON fragments). + +**Backward Compatibility:** Existing queries and APIs continue to work. Compression is transparent to readers — gaps in `seq` indicate compressed ranges. + +#### Storage Model + +Compressed events are stored in the `session_events` table: + +```sql +CREATE TABLE session_events ( + id VARCHAR(36) PRIMARY KEY, + session_id VARCHAR(36) NOT NULL REFERENCES sessions(id), + seq BIGINT NOT NULL, + event_type VARCHAR(255) NOT NULL, + payload TEXT NOT NULL, + created_at TIMESTAMPTZ NOT NULL, + completed_at TIMESTAMPTZ, -- timestamp of last event in compressed group (NULL for uncompressed) + event_count INT DEFAULT 1, -- number of raw events compressed (1 = uncompressed, >1 = compressed) + UNIQUE(session_id, seq) +); + +CREATE INDEX idx_session_events_session_id ON session_events(session_id); +CREATE INDEX idx_session_events_event_type ON session_events(event_type); +CREATE INDEX idx_session_events_created_at ON session_events(created_at); +``` + +#### Migration from Current State + +**Database Schema Changes** (API server): + +1. Create `session_events` table with compression fields: + ```sql + -- New table creation (no existing data to migrate) + CREATE TABLE session_events ( + id VARCHAR(36) PRIMARY KEY, + session_id VARCHAR(36) NOT NULL REFERENCES sessions(id), + seq BIGINT NOT NULL, + event_type VARCHAR(255) NOT NULL, + payload TEXT NOT NULL, + created_at TIMESTAMPTZ NOT NULL, + completed_at TIMESTAMPTZ, + event_count INT DEFAULT 1, + UNIQUE(session_id, seq) + ); + + CREATE INDEX idx_session_events_session_id ON session_events(session_id); + CREATE INDEX idx_session_events_event_type ON session_events(event_type); + CREATE INDEX idx_session_events_created_at ON session_events(created_at); + ``` + +2. No schema changes required for `session_messages` table (Messages API unchanged). + +**Backward Compatibility:** +- Compression is opt-in at the runner gRPC client level +- Legacy runners can continue pushing uncompressed events indefinitely (`event_count=1`, `completed_at=NULL`) +- API server accepts both compressed and uncompressed events transparently +- Existing `session_messages` table and Messages API remain unchanged + +**Field Semantics:** + +| Field | Description | +|-------|-------------| +| `seq` | Monotonic sequence within session; gaps allowed after compression | +| `event_type` | AG-UI event type enum (33 types: RUN_STARTED, TEXT_MESSAGE_START, TOOL_CALL_ARGS, etc.) | +| `payload` | JSON-encoded event payload; structure varies by event type | +| `created_at` | First event timestamp (for compressed events) or single event timestamp | +| `completed_at` | Last event timestamp for compressed events; `NULL` for uncompressed | +| `event_count` | Number of raw events compressed; `1` = uncompressed, `>1` = compressed | + +#### API Endpoints + +**Messages API** (human-readable conversation): +``` +GET /api/ambient/v1/sessions/{id}/messages # List conversation messages (paginated) +POST /api/ambient/v1/sessions/{id}/messages # Push user message (HTTP; validated as event_type=user) +``` + +**Events API** (comprehensive AG-UI event stream): +``` +GET /api/ambient/v1/sessions/{id}/events # SSE proxy to runner pod (live, ephemeral, active sessions only) +GET /api/ambient/v1/sessions/{id}/events/history # List persisted compressed events (paginated) +``` + +**Query Parameters (GET /events/history):** + +| Param | Type | Description | +|-------|------|-------------| +| `after_seq` | int64 | Return events with `seq > after_seq` (for replay/catch-up) | +| `event_type` | string | Filter by AG-UI event type (e.g., `TOOL_CALL_START`, `TEXT_MESSAGE_CONTENT`) | +| `limit` | int | Max events to return (default 100, max 1000) | +| `start_time` | ISO8601 | Filter events created after this timestamp | +| `end_time` | ISO8601 | Filter events created before this timestamp | + +**Response (GET /events/history):** +```json +{ + "items": [ + { + "id": "01HXY...", + "session_id": "2abc...", + "seq": 42, + "event_type": "TEXT_MESSAGE_CONTENT", + "payload": "{\"content\":\"Let me check the file\"}", + "created_at": "2026-05-21T10:00:00Z", + "completed_at": "2026-05-21T10:00:02Z", + "event_count": 8 + }, + { + "id": "01HXZ...", + "session_id": "2abc...", + "seq": 43, + "event_type": "TOOL_CALL_START", + "payload": "{\"tool_name\":\"Read\",\"tool_call_id\":\"tc_123\"}", + "created_at": "2026-05-21T10:00:02Z", + "completed_at": null, + "event_count": 1 + } + ], + "page": 1, + "size": 100, + "total": 15234 +} +``` + +#### gRPC Protocol + +**Messages API** (concise conversation): +```protobuf +// Push a human-readable message to the conversation +rpc PushSessionMessage(PushSessionMessageRequest) returns (SessionMessage) + +message PushSessionMessageRequest { + string session_id = 1; + string event_type = 2; // Simplified: user | assistant | tool_use | tool_result | system | error + string payload = 3; // Message body or summary +} + +message SessionMessage { + string id = 1; + string session_id = 2; + int64 seq = 3; + string event_type = 4; + string payload = 5; + google.protobuf.Timestamp created_at = 6; +} +``` + +**Events API** (comprehensive AG-UI stream): +```protobuf +// Push a compressed AG-UI event to the audit trail +rpc PushSessionEvent(PushSessionEventRequest) returns (SessionEvent) + +message PushSessionEventRequest { + string session_id = 1; + string event_type = 2; // AG-UI event type (33 types) + string payload = 3; // JSON-encoded event payload + optional google.protobuf.Timestamp completed_at = 4; // Last event timestamp (for compressed events) + optional int32 event_count = 5; // Number of events compressed (default 1) +} + +message SessionEvent { + string id = 1; + string session_id = 2; + int64 seq = 3; + string event_type = 4; + string payload = 5; + google.protobuf.Timestamp created_at = 6; + optional google.protobuf.Timestamp completed_at = 7; + int32 event_count = 8; +} +``` + +**Compression in gRPC Client:** + +The runner's gRPC client (`ambient-runner` Python package) implements compression **before** calling `PushSessionEvent`. The compressor maintains: +- **Context stack** — tracks active message_id, tool_call_id, thinking_id, reasoning_id +- **Accumulation buffer** — collects content/args fragments for current context +- **Flush logic** — detects boundary events and context transitions + +When a flush occurs, the compressor: +1. Concatenates accumulated fragments into a single payload +2. Attaches `event_count` and `completed_at` metadata +3. Calls `PushSessionEvent` once with the compressed event +4. Resets the accumulation buffer + +**Dual Push Pattern:** + +Runners emit **both** messages and events: +- `PushSessionMessage` — high-level conversation turns (user prompts, assistant replies, tool summaries) +- `PushSessionEvent` — every AG-UI event (text deltas, tool args, thinking tokens, all compressed) + +This provides both human-readable conversation history and complete audit trail. -The runner's `/events/{thread_id}` endpoint registers an asyncio queue into `bridge._active_streams[thread_id]` and streams every AG-UI event as SSE until `RUN_FINISHED` / `RUN_ERROR` or client disconnect. The API server's `/sessions/{id}/events` proxies this from the runner pod for the active session, routing via pod IP or session service. Keepalive pings fire every 30s to hold the connection open. +**Implementation Note:** Compression is **opt-in per runner framework**. Legacy runners can push uncompressed events (stored with `event_count=1`). The API server and database accept both formats transparently. --- @@ -1241,8 +1618,11 @@ _Last updated: 2026-04-28. Use this as the authoritative index — click into co |---|---|---|---|---| | **Sessions — CRUD** | ✅ | ✅ `SessionAPI.{Get,List,Create,Update,Delete}` | ✅ `get/create/delete session` | | | **Sessions — start/stop** | ✅ `/start` `/stop` | ✅ `SessionAPI.{Start,Stop}` | ✅ `start`/`stop` commands | | -| **Sessions — messages (list/push/watch)** | ✅ `/messages` | ✅ `PushMessage`, `ListMessages`, `WatchSessionMessages` (gRPC) | ✅ `session messages`, `session send` | gRPC watch via `session_watch.go` | -| **Sessions — live events (SSE proxy)** | ✅ `/events` → runner pod | ✅ `SessionAPI.StreamEvents` → `io.ReadCloser` | ✅ `session events` | Runner must be Running; 502 if unreachable | +| **Messages API — list/push/watch** | ✅ `/messages` | ✅ `PushMessage`, `ListMessages`, `WatchSessionMessages` (gRPC) | ✅ `session messages`, `session send` | Human-readable conversation in `session_messages` table | +| **Events API — live SSE stream** | ✅ `/events` → runner pod SSE | ✅ `SessionAPI.StreamEvents` → `io.ReadCloser` | ✅ `session events` | Ephemeral; runner must be Running; 502 if unreachable | +| **Events API — persisted history** | 🔲 `/events/history` | 🔲 `ListSessionEvents`, `PushSessionEvent` (gRPC) | 🔲 CLI not yet implemented | New `session_events` table with compression | +| **Events API — compression** | 🔲 runner gRPC client compressor | 🔲 `completed_at`, `event_count` fields in `SessionEvent` | 🔲 migration pending | Context-aware accumulation; 5:1 to 20:1 compression | +| **Events API — 33 AG-UI event types** | ✅ runners emit AG-UI types | 🔲 stored in `session_events.event_type` | 🔲 query support pending | TEXT_MESSAGE_START, TOOL_CALL_ARGS, THINKING_*, REASONING_*, etc. | | **Sessions — labels/annotations** | ✅ PATCH accepts `labels`/`annotations` | ✅ fields on `Session` type; `SessionAPI.Update(patch map[string]any)` | ⚠️ no dedicated subcommand; use `acpctl get session -o json` + manual PATCH | | | **Sessions — workspace files** | ✅ sessions plugin; stubs empty list when no runner; 503 per-file-op | 🔲 | 🔲 `session workspace list/get/put/delete` | Requires running session for file ops | | **Sessions — pre-upload files** | ✅ sessions plugin; stubs empty list when no runner; 503 per-file-op | 🔲 | 🔲 `session files list/upload/delete` | S3-staged; available before session starts |