From e8b52bf446d4ac831418491147be54bf38f904b7 Mon Sep 17 00:00:00 2001 From: Robert Hodges Date: Tue, 21 Apr 2026 20:21:08 -0700 Subject: [PATCH 1/5] Add directory for antalya docs, skills, and designs Signed-off-by: Robert Hodges --- antalya/README.md | 9 + .../alter-table-export-part-partition.md | 596 ++++++++++++++++++ antalya/skills/README.md | 86 +++ .../skills/antalya-feature-design/SKILL.md | 132 ++++ .../assets/design-template.md | 133 ++++ 5 files changed, 956 insertions(+) create mode 100644 antalya/README.md create mode 100644 antalya/docs/design/alter-table-export-part-partition.md create mode 100644 antalya/skills/README.md create mode 100644 antalya/skills/antalya-feature-design/SKILL.md create mode 100644 antalya/skills/antalya-feature-design/assets/design-template.md diff --git a/antalya/README.md b/antalya/README.md new file mode 100644 index 000000000000..4dc8767e840b --- /dev/null +++ b/antalya/README.md @@ -0,0 +1,9 @@ +# Antalya Extensions + +Location of Antalya-specific designs, documentation, and resources like +agent skills used for development. + +## Structure + +Echo structure of the repo root. For example, designs go in +antalya/docs/designs. Product documentation goes antalya/docs/en/antalya. diff --git a/antalya/docs/design/alter-table-export-part-partition.md b/antalya/docs/design/alter-table-export-part-partition.md new file mode 100644 index 000000000000..d21704f6af00 --- /dev/null +++ b/antalya/docs/design/alter-table-export-part-partition.md @@ -0,0 +1,596 @@ +Feature Design: `ALTER TABLE EXPORT PART` and `ALTER TABLE EXPORT PARTITION` +============================================================================ + +**Status:** draft +**Author(s):** Arthur Passos +**Related issues/PRs:** +https://github.com/Altinity/ClickHouse/pull/1618 + +**Last updated:** 2026-04-21 + +--- + +## 1. Requirements + +### Motivation + +Cost of storing data is a growing problem for large analytic systems +that use open source ClickHouse and replicated block storage. The core +problem is that block storage is (a) expensive and (b) replication makes +multiple copies. Project Antalya solves the storage cost problem using +the Hybrid Table Engine. Hybrid tables allow users to split tables +into segments, placing hot data on replicated block storage and +cold data on shared Iceberg tables using Parquet data files. + +The hybrid table approach requires a robust mechanism to export table +data from MergeTree tables to Iceberg. The mechanism must be fast, +use machine resources efficiently, handle failures automatically, and +be easy to monitor. This design covers two new ClickHouse commands to +export data. + +* `ALTER TABLE EXPORT PART` -- Exports a single part to Iceberg + +* `ALTER TABLE EXPORT PARTITION` -- Exports one or more partitions to Iceberg + +These commands replace `INSERT INTO ... SELECT FROM` pipelines that select +rows and write them out to one or more Parquet files. This approach +costs an extra decode/sort pass per export, does not coordinate across +replicas, and does not take advantage of existing partitioning and +sorting in MergeTree. `EXPORT PART` / `EXPORT PARTITION` write parts +directly to object storage from the source replica(s), preserving the +source sort order and cutting out the `SELECT` pipeline. + +### Requirements + +1. **SQL only.** All operations related to export are available in SQL. + There should be no need to use non-SQL tools or directly access + storage to run exports or clean up problems. + +2. **Efficient, order-preserving writes.** Write a specified `MergeTree` + part (or every part of a specified partition) to an object-storage + destination in Parquet, preserving the source part's sort order, + without using a `SELECT` pass and also minimizing the RAM required + to hold data during transfer. + +3. **Output file management.** Allow users to break exported parts + into smaller Parquet files, which helps ensure good performance + when scanning Iceberg data. + +4. **Data type equivalence.** Map ClickHouse types to Iceberg types + that cast back without data loss to the original ClickHouse types + when selecting data. Applications that access exported data through + a Hybrid table should be able to read the data back from Iceberg + without requiring changes. + +5. **Atomic transfer.** Readers should never see a partial export in + the target Iceberg table. Exported part(s) and partition(s) should + be visible in their entirety in Iceberg or not at all. + +6. **Distributed operation.** + - `EXPORT PART` always runs locally on the ClickHouse host where + it is invoked. + - `EXPORT PARTITION` from non-replicated `MergeTree` tables runs + locally on the ClickHouse host where it is invoked. + - `EXPORT PARTITION` is cluster-coordinated on `Replicated*MergeTree` + tables: any replica that has a given part contributes to the + export; the task is persistent and resumes after restarts. + +7. **Observability.** + It must be possible for users to track the following from system tables: + - Export part request status. + - Export partition request status. + - History of exported parts including whether export succeeded or failed. + - Relevant profile events related to export. + +8. **Error recovery.** + - **Idempotence.** Re-issuing the same export to the same + destination is a no-op while the export is running. (There should + be a way to track 'recent' exports so that they are idempotent as + well.) + - **Clean-up.** There must be a procedure to clean up a failed + export using only SQL commands. + - **Automatic restart.** `EXPORT PARTITION` task is persistent and + resumes after restarts. + +9. **Killable.** It must be possible to terminate any `ALTER TABLE EXPORT` command. + The command should be idempotent and must throw a clear exception on failure + rather than hanging. + +### Non-requirements + +- Non-Parquet output formats. Only `Parquet` is targeted in this iteration. +- `partition_strategy = 'wildcard'` destinations — only `'hive'` is supported; others throw + `NOT_IMPLEMENTED`. +- Exporting to arbitrary table functions. Only those backed by an object-storage engine that + supports exports (e.g. `s3`, `azure`) are valid; others throw `NOT_IMPLEMENTED`. +- Schema evolution between source and destination. Schemas must match (`INCOMPATIBLE_COLUMNS` + otherwise); destination cannot have a column that matches a source `EPHEMERAL`. +- Any read/query path over exported files — consumption happens via normal `S3` / `s3` / + external-engine reads. +- Synchronous exports. Both commands return immediately; completion is polled via system tables. +- Importing parts back from object storage (that is tracked separately). + +### Constraints + +- Experimental gate: `allow_experimental_export_merge_tree_part` (query-level) for `EXPORT PART`; + `enable_experimental_export_merge_tree_partition_feature` (server-level) for `EXPORT PARTITION`. +- `EXPORT PARTITION` requires a ZooKeeper / `clickhouse-keeper` ensemble with the `multi_read` + feature flag. +- Destination table must use `partition_strategy = 'hive'`. +- No change to `MergeTree` on-disk part format; only the Keeper schema under the table's + replication path is extended. + +### References + +- `docs/en/engines/table-engines/mergetree-family/part_export.md` +- `docs/en/engines/table-engines/mergetree-family/partition_export.md` +- `tests/queries/0_stateless/03572_export_merge_tree_part_basic.sh` +- `tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage_simple.sql` +- `tests/queries/0_stateless/03572_export_merge_tree_part_limits_and_table_functions.sh` +- `tests/queries/0_stateless/03572_export_merge_tree_part_special_columns.sh` +- `tests/queries/0_stateless/03572_export_replicated_merge_tree_part_to_object_storage.sh` +- `tests/queries/0_stateless/03572_export_replicated_merge_tree_part_to_object_storage_simple.sql` +- `tests/queries/0_stateless/03604_export_merge_tree_partition.sh` +- `tests/queries/0_stateless/03608_export_merge_tree_part_filename_pattern.sh` +- `tests/integration/test_export_merge_tree_part_to_object_storage/test.py` +- `tests/integration/test_export_replicated_mt_partition_to_object_storage/test.py` + +--- + +## 2. Functional specification + +### User-facing behavior +A user points `ALTER TABLE` at a `MergeTree` source and an object-storage destination (table or +table function). The command returns immediately with no rows. The export runs in the background; +progress lives in `system.exports` and, for partition exports, `system.replicated_partition_exports`. +Successful exports append to `system.part_log` with `event_type = 'ExportPart'`. Writers to +object storage drop one Parquet data file per part (or per chunk when split by size/rows) plus +one commit file per transaction. + +### SQL syntax / API + +```sql +-- Export a single part to a destination table +ALTER TABLE [db.]table + EXPORT PART 'part_name' + TO TABLE [dest_db.]dest_table + [SETTINGS ...]; + +-- Export a single part to a destination table function +ALTER TABLE [db.]table + EXPORT PART 'part_name' + TO TABLE FUNCTION s3(...) PARTITION BY + [SETTINGS ...]; + +-- Export every active part of a partition (Replicated*MergeTree only) +ALTER TABLE [db.]table + EXPORT PARTITION ID 'partition_id' + TO TABLE [dest_db.]dest_table + [SETTINGS ...]; + +-- Cancel one or more partition exports +KILL EXPORT PARTITION WHERE ; +``` + +### Individual command examples +These are derived from `tests/queries/0_stateless/03572_*` and `03604_export_merge_tree_partition.sh`. + +```sql +-- Part export to S3 table +ALTER TABLE mt_table EXPORT PART '2020_1_1_0' TO TABLE s3_table +SETTINGS allow_experimental_export_merge_tree_part = 1; + +-- Part export to S3 table function (schema inferred from source) +ALTER TABLE mt_table EXPORT PART '2020_1_1_0' +TO TABLE FUNCTION s3(s3_conn, filename='tf', format='Parquet', partition_strategy='hive') +PARTITION BY year +SETTINGS allow_experimental_export_merge_tree_part = 1; + +-- Split large part across multiple Parquet files +ALTER TABLE big EXPORT PART '2025_0_32_3' TO TABLE big_dest +SETTINGS allow_experimental_export_merge_tree_part = 1, + export_merge_tree_part_max_bytes_per_file = 10000000, + output_format_parquet_row_group_size_bytes = 5000000; + +-- Partition export across a Replicated cluster +ALTER TABLE rmt_table EXPORT PARTITION ID '2020' TO TABLE s3_table; + +-- Cancel by filter +KILL EXPORT PARTITION +WHERE partition_id = '2020' + AND source_table = 'rmt_table' + AND destination_table = 's3_table'; +``` + +### End-to-end example + +Create a `ReplicatedMergeTree` source, seed two partitions from +`system.numbers`, create a hive-partitioned S3 destination (the on-disk +shape an external Iceberg catalog such as Glue / REST / Nessie / +Lakekeeper registers as an Iceberg table), export one partition, +and verify: + +```sql +-- 1. Source table: Replicated MergeTree partitioned by year. +CREATE TABLE events +( + id UInt64, + ts DateTime, + year UInt16 +) +ENGINE = ReplicatedMergeTree('/clickhouse/tables/{database}/events', 'r1') +PARTITION BY year +ORDER BY (year, id); + +-- 2. Seed two partitions (2024, 2025) straight from `system.numbers`. +INSERT INTO events +SELECT + number AS id, + toDateTime('2024-01-01 00:00:00') + INTERVAL number SECOND AS ts, + 2024 AS year +FROM system.numbers +LIMIT 1000000; + +INSERT INTO events +SELECT + number AS id, + toDateTime('2025-01-01 00:00:00') + INTERVAL number SECOND AS ts, + 2025 AS year +FROM system.numbers +LIMIT 1000000; + +-- 3. Destination: S3 with hive partition layout. This is the on-disk shape that +-- an Iceberg catalog registers as a table; the `EXPORT PARTITION` command +-- itself does not touch any catalog. +CREATE TABLE events_iceberg +( + id UInt64, + ts DateTime, + year UInt16 +) +ENGINE = S3(s3_conn, filename='warehouse/events', format = Parquet, partition_strategy = 'hive') +PARTITION BY year; + +-- 4. Export the 2024 partition. Returns immediately; runs in the background. +ALTER TABLE events EXPORT PARTITION ID '2024' TO TABLE events_iceberg; + +-- 5. Watch progress (Keeper round-trip — use sparingly). +SELECT status, parts_count, parts_to_do, last_exception +FROM system.replicated_partition_exports +WHERE source_table = 'events' AND partition_id = '2024'; + +-- 6. When status = 'COMPLETED', the destination bucket contains: +-- warehouse/events/year=2024/_.1.parquet (one per part) +-- warehouse/events/commit_2024_ (atomicity manifest) +-- Readers that filter by commit see either the full partition or nothing. +SELECT count() FROM events_iceberg WHERE year = 2024; + +-- 7. Or inspect the layout directly. +SELECT _path +FROM s3(s3_conn, filename = 'warehouse/events/year=2024/**', format = 'One') +ORDER BY _path; +``` + +The same flow works for a non-replicated `MergeTree` source — coordination collapses to the +local node, but the command, destination shape, and observability surfaces are identical. + +### Operational notes + +The following notes expand on expected behavior of commands. + +1. `ALTER TABLE t EXPORT PART 'p' TO TABLE s3_t` writes + `//_.1.parquet` plus + `/commit__`, readable end-to-end via + `SELECT * FROM s3(...)` in tests `03572_*` and `03608_*`. + +2. `ALTER TABLE rmt EXPORT PARTITION ID 'p' TO TABLE s3_t` exports + every active part of partition `p` across all replicas that host + it; `system.replicated_partition_exports` converges to `COMPLETED`. + +3. Re-issuing the same `EXPORT PARTITION` within + `export_merge_tree_partition_manifest_ttl` is a no-op (no + duplicate files) unless `export_merge_tree_partition_force_export = 1`. + +4. Killing an in-flight partition export via `KILL EXPORT PARTITION` + transitions status to `KILLED` and stops all replicas' contributions. + +5. Exception during part export is counted in `PartsExportFailures`; + retry behavior honors `export_merge_tree_partition_max_retries`. + +### Settings + +| Setting | Scope | Default | Range / Values | Applies to | Description | +| --- | --- | --- | --- | --- | --- | +| `allow_experimental_export_merge_tree_part` | query | `false` | `Bool` | `EXPORT PART` | Experimental gate; required. | +| `enable_experimental_export_merge_tree_partition_feature` | server | `false` | `Bool` | `EXPORT PARTITION` | Experimental gate; required. | +| `export_merge_tree_part_overwrite_file_if_exists` | query | `false` | `Bool` | `EXPORT PART` | Overwrite existing destination file; otherwise throws. | +| `export_merge_tree_part_max_bytes_per_file` | query | `0` | `UInt64` (`0`=unlimited) | both | Soft cap per output file. Non-zero values can break idempotency. | +| `export_merge_tree_part_max_rows_per_file` | query | `0` | `UInt64` (`0`=unlimited) | both | Soft cap per output file. Non-zero values can break idempotency. | +| `export_merge_tree_part_throw_on_pending_mutations` | query | `true` | `Bool` | both | Refuse to export parts with pending mutations (unless mutation was `IN PARTITION`). | +| `export_merge_tree_part_throw_on_pending_patch_parts` | query | `true` | `Bool` | both | Refuse to export parts with pending patch parts. | +| `export_merge_tree_part_filename_pattern` | query | `{part_name}_{checksum}` | `String` | both | Filename template; supports `{part_name}`, `{checksum}`, `{database}`, `{table}`, server macros. | +| `export_merge_tree_partition_force_export` | query | `false` | `Bool` | `EXPORT PARTITION` | Overwrite a live Keeper manifest for the same `(source, destination, partition_id)`. | +| `export_merge_tree_partition_max_retries` | query | `3` | `UInt64` | `EXPORT PARTITION` | Per-part retry budget before the partition export fails. | +| `export_merge_tree_partition_manifest_ttl` | query | `180` (seconds) | `UInt64` | `EXPORT PARTITION` | Live-manifest TTL; acts as the idempotency window. Does not interrupt in-flight tasks. | +| `export_merge_tree_part_file_already_exists_policy` | query | `skip` | `skip` / `error` / `overwrite` | `EXPORT PARTITION` | Per-file policy during partition export. | + +Default-value impact: all new settings default to "off" or to conservative +values (pending-mutation guards default to throwing). No existing query +behavior changes unless a user opts in. + +### System tables / metrics / log messages / observability + +- `system.exports` — rows for currently-executing part exports (source/destination tables, + `part_name`, destination paths, `elapsed`, rows/bytes counters, memory counters). Dropped when + the export completes. +- `system.replicated_partition_exports` — rows for `EXPORT PARTITION` tasks. Backed by Keeper; + querying it is a Keeper round-trip and should be used sparingly. Columns include + `source_database`, `source_table`, `destination_database`, `destination_table`, `create_time`, + `partition_id`, `transaction_id`, `source_replica`, `parts`, `parts_count`, `parts_to_do`, + `status`, `exception_replica`, `last_exception`, `exception_part`, `exception_count`. +- `system.part_log` — each completed part export appends one row with `event_type = 'ExportPart'`, + filled `remote_file_paths`, `merged_from = [part_name]`, plus standard timing / size / error + fields. +- `ProfileEvents`: + - `PartsExports` — successful part-export completions. + - `PartsExportFailures` — failures. + - `PartsExportDuplicated` — skipped because destination file already existed. + - `PartsExportTotalMilliseconds` — cumulative wall time. +- Status enum on `system.replicated_partition_exports`: `PENDING`, `COMPLETED`, `FAILED`, `KILLED`. + +### Error behavior + +- Missing experimental flag: `SUPPORT_IS_DISABLED` (exact code TBD — confirm against + `src/Common/ErrorCodes.cpp`). +- Destination schema mismatch (columns / types / order; source `EPHEMERAL` column present in + destination): `INCOMPATIBLE_COLUMNS`. +- Destination engine doesn't support exports (e.g. `url`, non-hive `partition_strategy`, unknown + engine): `NOT_IMPLEMENTED`. +- Destination is an unknown table function: `UNKNOWN_FUNCTION`. +- Pending mutations or patch parts when the guard is enabled: `BAD_ARGUMENTS` (TBD — confirm). +- Part not found on any replica: `NO_SUCH_DATA_PART` (TBD — confirm). +- Destination file already exists and policy is `error` / `export_merge_tree_part_overwrite_file_if_exists = 0`: + `FILE_ALREADY_EXISTS` (TBD — confirm). +- Duplicate live manifest without `..._force_export`: `DUPLICATE_EXPORT_TASK` or equivalent + (TBD — confirm). + +All of the above **throw exceptions**; they do not crash the server. + +### Backward compatibility +- **Older client → newer server:** harmless — the client issues the new `ALTER` text; the server + parses it. No wire-protocol change. +- **Newer client → older server:** older server fails parse on `EXPORT PART` / `EXPORT PARTITION` + with `SYNTAX_ERROR`. Acceptable. +- **Mixed-version cluster (replication):** `EXPORT PART` is local to one replica; no cross-replica + effect. `EXPORT PARTITION` stores its manifest under the table's Keeper path in a new + subtree; replicas on older versions ignore unknown nodes but will NOT contribute parts to the + export — the initiating replica (which must be on the new version) completes alone if it holds + all the parts; otherwise the task stalls on `parts_to_do > 0`. An upgrade-ordering note for + operators is required (section 4 / Rollout). +- **On-disk format:** unchanged. Parts are read as-is; Parquet is produced on the fly. +- **Default-value changes:** none that affect existing workloads (see settings table). + +--- + +## 3. Implementation + +### Architecture overview +Parser adds two new `ASTAlterCommand` variants (`EXPORT_PART`, `EXPORT_PARTITION`) plus +`KILL EXPORT PARTITION`. The interpreter side routes `EXPORT PART` into a new per-part export +task scheduled on the background export pool; `EXPORT PARTITION` is routed through Keeper for +coordination and expands into N per-part tasks across the replicas that host each part. + +Part-level pipeline: reuse the existing Parquet output stack (`StorageS3Sink` / Parquet writer) +fed by a source that streams a single part in primary-key order, with no post-read sort or merge +pass. + +Partition coordination: Keeper nodes under `/partition_exports//` hold the +manifest (parts list, policy), per-part assignment, per-replica progress, and the kill flag. +Each replica watches `partition_exports` and picks up parts it holds locally. + +### Key design decisions + +1. **Separate AST nodes for each command.** `EXPORT PART` and `EXPORT PARTITION` get distinct + `ASTAlterCommand::Type` variants; `KILL EXPORT PARTITION` is its own `ASTKillExportPartitionQuery`. + The part primitive and the cluster-coordinated partition command do materially different work + and should not share a single code path. + +2. **Reuse the existing object-storage sink.** Export rides on the destination engine's existing + Parquet writer (`StorageS3` / `StorageAzureBlob` sink). No new encoder. The sink is extended + to accept an already-ordered stream and a target filename derived from + `export_merge_tree_part_filename_pattern`. + +3. **Stream parts in primary-key order, no re-`SELECT`.** The per-part reader walks the part in + its on-disk order and feeds the Parquet writer directly, skipping the analyzer / planner / + executor decode and sort path that `INSERT INTO ... SELECT` would take. This is the central + performance claim. + +4. **Coordinate partition exports via a dedicated Keeper subtree.** New path + `/partition_exports//` holds the manifest, per-part assignment, per-replica + progress, and the kill flag — separate from the replication log. The replication log is for + data mutations that must apply on every replica; partition exports are a distributed + side-effect whose assignment depends on which replica holds which part, so they warrant their + own subtree. + +5. **Atomicity via commit files.** Each transaction emits one `commit__` (part + level) or `commit__` (partition level) file that lists every data file + written. Readers wanting atomicity filter by commit; this avoids on-target renames or + multipart-transaction protocols on object storage. + +6. **Async model with three observability surfaces.** Commands return immediately. In-flight + progress lives in `system.exports` (local, dropped on completion); + `system.replicated_partition_exports` (Keeper-backed — querying is a Keeper round-trip, use + sparingly); and `system.part_log` gains an `ExportPart` `event_type` for completed per-part + exports. Four `ProfileEvents` (`PartsExports`, `PartsExportFailures`, `PartsExportDuplicated`, + `PartsExportTotalMilliseconds`) expose aggregate counters. + +7. **Idempotency enforced in Keeper.** Duplicate `(source, destination, partition_id)` submissions + are refused while the manifest is live. The manifest TTL + (`export_merge_tree_partition_manifest_ttl`, default 180s) defines the idempotency window; it + does NOT terminate in-flight tasks. `export_merge_tree_partition_force_export = 1` overrides. + +8. **Two experimental gates, asymmetric scope.** `EXPORT PART` is gated query-level + (`allow_experimental_export_merge_tree_part`) — individual users can try it. `EXPORT PARTITION` + is gated server-level (`enable_experimental_export_merge_tree_partition_feature`) because it + writes to Keeper and engages cluster coordination — rollout is an operator decision, not a + per-query one. + +### Concurrency / locking + +- Per-part export holds the part's `DataPartStorage` lock for the duration of the read (same + guarantees as a merge/mutation read). +- Partition-export coordinator uses Keeper multi-transactions to (a) claim a part, (b) record + progress, (c) decrement `parts_to_do`, (d) transition status. +- No server-wide lock. Background export pool size is bounded (reuse the existing + `background_pool_size` knob or add a dedicated one — TBD). +- Idempotency against duplicate submission is enforced at the Keeper manifest level (unique + `(source, destination, partition_id)` while manifest is live). + +### Storage format changes + +- **On-disk parts:** unchanged. +- **Keeper:** adds `/partition_exports/` subtree. Older servers ignore unknown + Keeper children; no schema version bump required but coordinator code MUST be tolerant of + concurrent removal by another version (TBD — verify). +- **Object-storage layout:** `//..` plus + `/commit_` (part-level) or + `/commit__` (partition-level). Readers that don't understand + commit files will see the data files directly — this is acceptable for non-atomic readers but + callers wanting atomicity MUST filter by commit. + +### Performance + +- Hot path: Parquet encoding of a single part. No extra `SELECT` / sort / merge pass vs. + `INSERT ... SELECT` baseline — that is the expected win. +- Memory: one Parquet writer per concurrent export; row-group buffering bounded by + `output_format_parquet_row_group_size_bytes`. +- I/O: one network write stream per output file; chunked when + `export_merge_tree_part_max_bytes_per_file` / `_max_rows_per_file` set. +- Benchmark coverage: TBD — propose a `tests/performance/export_merge_tree_part.xml` comparing + `EXPORT PART` vs. `INSERT INTO s3_t SELECT FROM mt_t WHERE _part = ...` over a ~1 GB part. + +### Alternatives considered + +1. `INSERT INTO s3_t SELECT FROM mt_t WHERE _part = 'p'` — today's workaround. Rejected + because it runs the full `SELECT` pipeline (decode, potential re-sort, distribute) per export, + has no cross-replica coordination, and no native commit-file atomicity. +2. **Synchronous `ALTER ... EXPORT PART` that blocks the client** — rejected; partition exports + can run for hours and the HTTP / native session would time out. Async + system tables mirrors + `ALTER ... MUTATE` and is already familiar. +3. **Non-replicated `EXPORT PARTITION` (per-replica, uncoordinated)** — rejected because + duplicates and split-brain are the default outcome when every replica independently exports + the parts it holds. +4. **Queue the partition export in the existing replication log** — rejected; the replication + log is for *data* mutations that must apply on every replica, whereas partition exports are + a distributed *side-effect* whose assignment depends on which replica holds which part. + Separate Keeper subtree is cleaner. +5. **Non-Keeper coordination (leader replica drives everything)** — rejected; would require a + new leader-election path and wouldn't survive leader restart without a Keeper-backed manifest + anyway. + +### Open questions + +- Exact error codes for each failure class above — confirm names in + `src/Common/ErrorCodes.cpp` during prototype. +- Whether `EXPORT PART` should refuse to run against `Replicated*MergeTree` (forcing users to + `EXPORT PARTITION`) or remain allowed as the primitive it clearly is. Current tests allow + both; this should be documented explicitly. +- Dedicated background pool for exports vs. reuse of existing `background_move_pool_size` / + similar — TBD. +- Behaviour of `EXPORT PARTITION` when initiating replica dies mid-task: the manifest persists + in Keeper, but does a surviving replica take over as "source replica"? Current docs say "task + is persistent" — clarify recovery semantics. +- Is the manifest TTL enforced by the initiator or by a cluster-wide cleanup job? Affects what + happens when the initiator is offline. + +--- + +## 4. Test plan + +### Functional tests — `tests/queries/0_stateless` + +Existing coverage to retain: + +- `03572_export_merge_tree_part_basic.sh` — golden path, idempotent re-export, wildcard + + hive partition strategies. +- `03572_export_merge_tree_part_to_object_storage_simple.sql` — error cases + (`INCOMPATIBLE_COLUMNS`, `NOT_IMPLEMENTED`, `UNKNOWN_FUNCTION`, `EPHEMERAL` collision). +- `03572_export_merge_tree_part_limits_and_table_functions.sh` — `max_bytes_per_file`, + `max_rows_per_file`, table-function destination with schema inheritance / explicit structure. +- `03572_export_merge_tree_part_special_columns.sh` — `ALIAS`, `MATERIALIZED`, `EPHEMERAL`, + mixed / complex expressions. +- `03572_export_replicated_merge_tree_part_to_object_storage.sh` + + `03572_export_replicated_merge_tree_part_to_object_storage_simple.sql` — part-level export + from `ReplicatedMergeTree`. +- `03604_export_merge_tree_partition.sh` — basic `EXPORT PARTITION ID`. +- `03608_export_merge_tree_part_filename_pattern.sh` — default and custom + `export_merge_tree_part_filename_pattern` including `{database}` / `{table}` macros. + +New tests to add: + +- `NNNN_export_merge_tree_part_pending_mutations.sh` — with/without the + `..._throw_on_pending_mutations` / `..._throw_on_pending_patch_parts` guards and `IN PARTITION` + mutations. +- `NNNN_export_merge_tree_part_commit_file.sh` — verify a `commit__` file exists + alongside every successful export and references every written data file; verify that a + partial run (simulated by killing before commit) does not produce a commit file. +- `NNNN_export_merge_tree_part_overwrite_policy.sh` — all three values of + `export_merge_tree_part_file_already_exists_policy` (`skip`, `error`, `overwrite`) plus + `export_merge_tree_part_overwrite_file_if_exists`. +- `NNNN_export_merge_tree_part_profile_events.sh` — assert `PartsExports`, + `PartsExportFailures`, `PartsExportDuplicated`, `PartsExportTotalMilliseconds` move as + expected. + +Do not add `no-parallel` to any new test unless explicitly required by shared S3 bucket paths; +`03604` currently has the tag and should be re-examined to see whether unique per-run paths +remove the need. + +### Integration tests — `tests/integration` + +Keep: + +- `test_export_merge_tree_part_to_object_storage/` — part export in a multi-node setup. +- `test_export_replicated_mt_partition_to_object_storage/` — partition export across replicas, + including `wait_for_export_status`, retry counting, and replica failure scenarios. + +Add: + +- A case where the initiating replica dies mid-partition-export and a surviving replica must + complete the task (covers the open question above). +- A case where the experimental feature is disabled on one replica + (`disable_experimental_export_partition.xml` config) and enabled on the rest — confirm the + task still completes via the enabled replicas. +- `KILL EXPORT PARTITION` transitions status to `KILLED` and leaves no orphan in-flight writer. +- Mixed-version cluster: upgrade scenario where only some replicas know about `EXPORT PARTITION`. + +Invocation: `python -m ci.praktika run "integration" --test test_export_merge_tree_part_to_object_storage,test_export_replicated_mt_partition_to_object_storage`. + +### Performance tests — `tests/performance` + +Add `export_merge_tree_part.xml`: compare `ALTER TABLE ... EXPORT PART` vs. +`INSERT INTO s3_t SELECT * FROM mt WHERE _part = ...` on a ~1 GB Wide part; track wall time and +peak memory. Hot path is the Parquet encoder, which warrants a guard against regressions. + +### Manual verification + +- Roundtrip: export partition → read via `SELECT * FROM s3(...)` → create new + `ReplicatedMergeTree` from the S3 data → row counts / checksums match the source. +- `system.replicated_partition_exports` behaviour under a crashed initiator (cluster restart). +- Object-storage layout inspection via `s3(..., format=One)` listing: exactly N data files + 1 + commit file per transaction. + +### Rollout / risk + +- **Risk:** Keeper schema extension is write-once; a partially-rolled-out cluster where only + some replicas understand the `partition_exports` subtree will stall partition exports + (`parts_to_do > 0`) rather than corrupt data. Acceptable but must be documented in the upgrade + notes. +- **Risk:** object-storage cost / accidental large exports. Mitigated by the experimental gate + (default off) and the manifest idempotency window. +- **Flag strategy:** ship with `allow_experimental_export_merge_tree_part` (query, default + `false`) and `enable_experimental_export_merge_tree_partition_feature` (server, default + `false`). Flip defaults to `true` only after: (a) the open questions above are resolved, (b) + the new functional / integration tests land, (c) one release cycle of customer feedback. +- **Watch in production:** `PartsExportFailures`, `exception_count` on + `system.replicated_partition_exports`, Keeper watch counts under the `partition_exports` + subtree, and object-storage request-error rates. diff --git a/antalya/skills/README.md b/antalya/skills/README.md new file mode 100644 index 000000000000..d04d20e3a434 --- /dev/null +++ b/antalya/skills/README.md @@ -0,0 +1,86 @@ +# Antalya skills + +Source-of-truth for project-specific Claude Code skills. Each subdirectory is +one skill, laid out per the Claude Code skills spec: + +``` +skills/ +├── README.md (this file) +└── / + ├── SKILL.md (required — YAML frontmatter + body) + └── assets/ | scripts/ | references/ (optional bundled resources) +``` + +Skills in this directory are **not** auto-discovered. Claude Code looks for +skills in `.claude/skills/` (and a few other locations — see below). To make +a skill here usable, link it into a `.claude/skills/` directory. + +## Linking a skill into `.claude/skills/` + +From the repo root (`Altinity-ClickHouse/`), create a symlink from +`.claude/skills/` to the corresponding directory under +`antalya/skills/`: + +```bash +cd /path/to/Altinity-ClickHouse +mkdir -p .claude/skills +ln -s ../../antalya/skills/antalya-feature-design .claude/skills/antalya-feature-design +``` + +The link target is relative so the repo is portable across checkout locations. +Verify it resolves: + +```bash +ls -l .claude/skills/antalya-feature-design/SKILL.md +# should print the SKILL.md path with no "No such file" error +``` + +Claude Code will pick the skill up on its next start. Inside a session, type +`/skills` to confirm it's listed, or just trigger it by describing a matching +task. + +## Where else `.claude/skills/` is honored + +Claude Code scans these locations for skills, in order: + +- `/.claude/skills/` +- Any `.claude/skills/` in ancestor directories of `` +- `~/.claude/skills/` (user-global) +- Installed plugins + +For this repo, linking into the top-level `Altinity-ClickHouse/.claude/skills/` +(next to `CLAUDE.md`) covers every working directory under the repo, including +all worktrees. Linking into a specific worktree's `.claude/skills/` (for +example `antalya/.claude/skills/`) scopes the skill to that worktree. + +Prefer the top-level link unless a skill is genuinely worktree-specific. + +## Adding a new skill + +1. Create `antalya/skills//SKILL.md` with the required frontmatter: + ```markdown + --- + name: + description: + --- + + # Body... + ``` +2. Bundle any supporting files under `assets/`, `scripts/`, or `references/` + inside the skill directory. +3. Link it into `.claude/skills/` as shown above. +4. Commit both the skill source (under `antalya/skills/`) and the symlink + (under `.claude/skills/`) so other contributors pick it up. + +For guidance on writing a good SKILL.md — especially the description field, +which drives triggering — see the upstream skill-creator docs. + +## Removing or renaming + +Delete or update both the source directory and its symlink. A broken symlink +under `.claude/skills/` will produce a load-time warning from Claude Code. + +## Current skills + +- **antalya-feature-design** — scaffold or review a ClickHouse / Antalya + feature design document before implementation. diff --git a/antalya/skills/antalya-feature-design/SKILL.md b/antalya/skills/antalya-feature-design/SKILL.md new file mode 100644 index 000000000000..cef1afa1b659 --- /dev/null +++ b/antalya/skills/antalya-feature-design/SKILL.md @@ -0,0 +1,132 @@ +--- +name: antalya-feature-design +description: Scaffold or review a feature design document for ClickHouse / Antalya. Use this whenever a developer wants to design or implement a new feature, add a SQL function, add a setting, add a new engine or format, or change server behavior in a non-trivial way — even if they don't explicitly ask for a "design doc". Also use when reviewing an existing design before implementation starts. +--- + +# Antalya Feature Design + +Help the developer produce a feature design for ClickHouse / Antalya before code is +written. A good design shortens review, prevents abandoned branches, and surfaces compatibility +landmines before they become migrations. + +## When to create vs. review + +- **Create** when the developer is starting new work and has no design yet, or only a rough idea. +- **Review** when the developer hands you an existing design (file path, paste, or PR link) and + asks for feedback, a critique, or to "check" it. + +If unclear, ask one question: "Are we drafting a new design, or reviewing an existing one?" + +--- + +## Creating a design + +### 1. Ask a few questions first (don't guess) + +The template has four sections. Before filling them in, confirm the basics — a wrong premise wastes +more time than a short interview: + +- **What problem is being solved?** Push for a concrete workload or user report, not "users want X". +- **What's the rough shape?** New SQL syntax? A new setting? A new engine/format? A behavior change? +- **Any hard constraints?** Upstream ClickHouse parity, on-disk compatibility, a specific release + train, a customer deadline. +- **Where should the design file live?** Default to `docs/design/.md` relative to the + repo root, but let the developer override. + +If the developer has already answered these in conversation, skip ahead — don't re-interview them. + +### 2. Copy the template, then fill it in + +The template lives at `assets/design-template.md` next to this `SKILL.md`. Copy it to the chosen +path, then populate each section based on the conversation. Leave a section marked `TBD` with a +pointed question rather than inventing content — a visible gap is more useful than a plausible +fabrication. + +### 3. Conventions to apply while writing + +These match the project `CLAUDE.md` and make the design consistent with the rest of the codebase: + +- Wrap SQL keywords, function names, class names, setting names, engine names, and literal log + message excerpts in inline code blocks: `MergeTree`, `SELECT`, `max_threads`, `system.errors`. +- Refer to functions as `f` (not `f()`) when naming the function itself rather than a call. +- Say "throws an exception" rather than "crashes" for logical errors — release builds don't crash + on `LOGICAL_ERROR`. +- Cite files as `path/to/file.cpp:line` so reviewers can jump directly. +- Prefer one-line statements of intent over prose padding. + +### 4. Push back on weak spots as you write + +Drafting is also a review — don't wait for section 4 to think. In particular: + +- If the **Motivation** reduces to "users want X" with no concrete workload, ask for one. +- If **Goals** are not measurable, say so and suggest a measurable form. +- If there are no **Non-goals**, propose a few. The absence of non-goals is the single biggest + source of scope creep. +- If **Alternatives considered** is empty, press for at least one rejected approach. A design with + no alternatives considered usually means the author has a solution looking for a problem. + +--- + +## Reviewing a design + +Read the design end-to-end first, then walk the checklist below. Report findings grouped by +severity: **blocking** (must fix before implementation), **should-address** (fix before merge), +**nit** (optional). Quote the exact text you're critiquing so the author can find it fast. + +### Requirements +- Motivation cites a concrete workload, incident, or user report — not a generic assertion. +- Requirements are measurable. "Faster" is not a requirement; "query `Q` drops from 5s to <500ms on dataset `D`" + is. +- Non-requirements are listed. Missing non-requirement almost always produce scope creep in review. + +### Functional specification +- Every new/changed SQL syntax has at least one concrete example. +- Every new setting has scope (server / user / query), default, and valid range. +- Default-value changes that alter behavior for existing workloads are called out explicitly. +- Error behavior specifies the error code, not just "throws an error". +- Backward compatibility covers: older client → newer server, newer client → older server, + mixed-version cluster, on-disk format. If any of these is genuinely N/A, the design should say + so — silence is not the same as "not applicable". +- Formatting convention: inline code blocks around SQL identifiers, engine names, settings. + +### Implementation +- Architecture section names the subsystems touched (parser / analyzer / planner / executor / + storage / replication / keeper). If it touches many, that's a design smell worth flagging. +- New abstractions pull their weight. If an interface has one implementation and no foreseeable + second, flag it — per project `CLAUDE.md`, three similar lines beat a premature abstraction. +- Storage format changes include a migration path and version-bump strategy. +- Performance section identifies the benchmark that will cover the hot path. Hand-wavy "shouldn't + be a regression" is not enough for hot-path code. +- Alternatives considered contains at least one rejected approach with a reason. A design with no + rejected alternatives is under-explored. +- Error handling lives at boundaries (user input, external systems), not scattered through + internal code that already trusts its callers. + +### Test plan +- Golden path, edge cases, and error cases are all enumerated — not just "add tests". +- Tests use `tests/queries/0_stateless` for functional coverage and `tests/integration` for + anything touching replication, keeper, distributed queries, S3, auth, or Kafka. +- No `no-*` tags (especially `no-parallel`) unless there's a stated reason they're necessary. +- New tests are proposed as new files, not extensions of existing ones. +- Integration-test invocation is specified: + `python -m ci.praktika run "integration" --test `. +- Performance tests exist if the feature is in the hot path. +- Rollout section names the specific risk and whether a feature flag (and default) is warranted. + +### Cross-cutting red flags +- Feature flags or backwards-compatibility shims added "just in case" — per project `CLAUDE.md`, + prefer changing the code directly when you can. +- Comments in the proposed implementation that restate what the code does, or name the current + task/PR/issue — those belong in commit messages and PR descriptions, not in source. +- Unfinished sections left as `TBD` with no owner or question — either resolve or annotate with + the specific decision needed and who owns it. + +--- + +## Output etiquette + +- When creating, write the file and then tell the developer which sections need their input (the + `TBD` items) rather than showing the whole template in chat. +- When reviewing, lead with a one-line verdict ("ready to implement", "needs revisions in + sections 2 and 4", "fundamental rethink needed"), then the grouped findings. +- Keep the review terse. Block quotes from the design plus one-sentence critiques beat long prose. diff --git a/antalya/skills/antalya-feature-design/assets/design-template.md b/antalya/skills/antalya-feature-design/assets/design-template.md new file mode 100644 index 000000000000..171e0f5c64df --- /dev/null +++ b/antalya/skills/antalya-feature-design/assets/design-template.md @@ -0,0 +1,133 @@ +# Feature Design: [Feature Name] + +**Status:** draft | under review | accepted | implemented +**Author(s):** +**Reviewers:** +**Related issues/PRs:** +**Last updated:** + +--- + +## 1. Requirements + +### Motivation +What problem does this feature solve? Why now? Quote user reports, incidents, or concrete +workloads where possible — avoid generic statements like "users want X". + +### Requirements +- A series of numbered requirements for the feature. + +### Non-requirements +What is explicitly out of scope. Naming non-requirements up front prevents scope creep during review. + +### Constraints +Compatibility requirements (older clients, existing on-disk formats, upstream ClickHouse parity), +deadlines, licensing, platform restrictions. + +### References +URLs of background documents, GitHub issues, and pull requests that have +information relevant to the design. + +--- + +## 2. Functional specification + +### User-facing behavior +Describe the feature strictly from the user's perspective — what they type, what they see back. +No implementation details here. + +### SQL syntax / API +Concrete examples. Wrap SQL keywords, function names, setting names, and engine names in inline +code blocks (e.g. `MergeTree`, `SELECT`, `max_threads`). + +```sql +-- Example 1: basic usage +SELECT ... + +-- Example 2: edge case +... +``` + +### Settings +Relevant settings, presented in tabular form. + +| Setting | Scope | Default | Range | Description | +| --- | --- | --- | --- | --- | +| `setting_name` | server / user / query | ... | ... | ... | + +Note which settings are new vs. changes to existing ones, and whether defaults change behavior +for existing workloads. + +### System tables / metrics / log messages / observability +New or changed rows in `system.settings`, `system.metrics`, `system.events`, `system.errors`, +`system.*_log`. Any new `ProfileEvents` or `CurrentMetrics. Highlight data that can provide +observability information to ClickHouse administrators. + +### Error behavior +What exceptions are thrown, with which error codes, on which inputs. Say "throws exception" rather +than "crashes" — logical errors don't crash release builds. + +### Backward compatibility +Highlight compatibility issues, for example: +- Older clients talking to a new server +- New clients talking to an older server +- Mixed-version clusters (replication, distributed queries) +- On-disk format changes (mark version, metadata version) +- Default changes that alter existing query behavior + +--- + +## 3. Implementation + +### Architecture overview +One paragraph, plus a diagram if it helps. Where does the feature live in the codebase, and how +does it connect to existing subsystems (parser → analyzer → planner → executor → storage)? + +### Key design decisions +Highlight prominent aspects of the design that an would be useful to implemntors or maintainers. + +### Concurrency / locking +Threads involved, locks held, ordering constraints. If none, state that explicitly. + +### Storage format changes +Mark/metadata version bumps, migration path, how older parts are read after upgrade, how newer +parts are rejected by older servers. + +### Performance +- Expected overhead in the hot path (CPU, memory, I/O) +- Which benchmarks will exercise this (e.g. specific `tests/performance/` cases) +- Memory ownership and lifetime of any new allocations + +### Alternatives considered +Short list of approaches you rejected and why. This is the single most useful section for +reviewers — it shows you explored the design space. + +### Open questions +Things you don't yet have an answer to. Better to name them than to paper over them. + +--- + +## 4. Test plan + +### Functional tests — `tests/queries/0_stateless` +List planned test files. Prefer adding new tests over extending existing ones. + +- Golden path: basic feature works on simple input +- Edge cases: empty input, single row, boundary values, max sizes +- Error cases: invalid input produces the expected exception with the expected error code +- Settings interactions: feature off (default), feature on, interaction with related settings +- Compatibility: works with older parts / mixed formats, if relevant + +### Integration tests — `tests/integration` +Required if the feature touches replication, keeper, distributed queries, S3, auth, Kafka, etc. +Invoke with `python -m ci.praktika run "integration" --test `. + +### Performance tests — `tests/performance` +Required if the feature is in the hot path. + +### Manual verification +Any checks that aren't automated (UI, metrics dashboards, upgrade scenarios). + +### Rollout / risk +Deployment risk, whether a feature flag is warranted (default off? default on?), the existing +tests that guard against regressions, and what to watch in production after rollout. From 2fb4a60ac2e7e77e39e69fcba03d5393906ac009 Mon Sep 17 00:00:00 2001 From: Robert Hodges Date: Thu, 23 Apr 2026 16:53:19 -0700 Subject: [PATCH 2/5] Update design to pull in EXPORT PARTITION behavior Also incorporated design feedback. Signed-off-by: Robert Hodges --- .../alter-table-export-part-partition.md | 615 ++++++++++++++---- .../skills/antalya-feature-design/SKILL.md | 31 +- .../assets/design-template.md | 4 +- 3 files changed, 514 insertions(+), 136 deletions(-) diff --git a/antalya/docs/design/alter-table-export-part-partition.md b/antalya/docs/design/alter-table-export-part-partition.md index d21704f6af00..96b6b13ce18f 100644 --- a/antalya/docs/design/alter-table-export-part-partition.md +++ b/antalya/docs/design/alter-table-export-part-partition.md @@ -23,22 +23,41 @@ into segments, placing hot data on replicated block storage and cold data on shared Iceberg tables using Parquet data files. The hybrid table approach requires a robust mechanism to export table -data from MergeTree tables to Iceberg. The mechanism must be fast, -use machine resources efficiently, handle failures automatically, and -be easy to monitor. This design covers two new ClickHouse commands to -export data. - -* `ALTER TABLE EXPORT PART` -- Exports a single part to Iceberg - -* `ALTER TABLE EXPORT PARTITION` -- Exports one or more partitions to Iceberg - -These commands replace `INSERT INTO ... SELECT FROM` pipelines that select -rows and write them out to one or more Parquet files. This approach -costs an extra decode/sort pass per export, does not coordinate across -replicas, and does not take advantage of existing partitioning and -sorting in MergeTree. `EXPORT PART` / `EXPORT PARTITION` write parts -directly to object storage from the source replica(s), preserving the -source sort order and cutting out the `SELECT` pipeline. +data from `MergeTree` tables into shared storage. The mechanism must +be fast, use machine resources efficiently, handle failures +automatically, and be easy to monitor. This design covers two new +ClickHouse commands to export data so users can populate hybrid +tables and move data to them at regular intervals. + +* `ALTER TABLE EXPORT PART` -- Exports a single part to a destination. + +* `ALTER TABLE EXPORT PARTITION` -- Exports one or more partitions to a destination. + +Both commands accept two destination families: + +1. **Plain object storage** (`S3`, `AzureBlobStorage`, equivalents). + Output is Parquet laid out in hive-partitioned directories. Atomicity + is provided by a sidecar *commit file* that enumerates the data files + written in the transaction; readers that want atomicity filter by + commit. An external catalog (Glue, REST, Nessie, Lakekeeper, ...) can + register this layout as an Iceberg table afterward, but the export + commands themselves do not interact with any catalog in this mode. + +2. **Apache Iceberg tables, with or without a catalog** (`Iceberg*` + engines, `iceberg*` table functions, `DatabaseIceberg`). Output is + Parquet data files plus per-file Avro *statistics sidecars*; on + commit the initiating replica assembles a new Iceberg manifest, + writes a new `metadata.json`, and swaps the catalog pointer (or the + warehouse `metadata.json` pointer when catalog-less). Atomicity is + native — the snapshot either exists or it does not. + +These commands replace `INSERT INTO ... SELECT FROM` pipelines that +select rows and write them out to one or more Parquet files. That +approach uses resources for sorting, does not coordinate across replicas, +and does not take advantage of existing partitioning and sorting in +`MergeTree`. `EXPORT PART` and `EXPORT PARTITION` write parts directly +to the destination from the source replica(s), preserving the source +sort order and cutting out the `SELECT` pipeline. ### Requirements @@ -49,8 +68,11 @@ source sort order and cutting out the `SELECT` pipeline. 2. **Efficient, order-preserving writes.** Write a specified `MergeTree` part (or every part of a specified partition) to an object-storage destination in Parquet, preserving the source part's sort order, - without using a `SELECT` pass and also minimizing the RAM required - to hold data during transfer. + without using a `SELECT ORDER BY` pass. Exporting a part should use the + same or less RAM than doing an `INSERT...SELECT...ORDER BY` on the same + data. (For example, it's not uncommon for the latter command to run + out of memory on large parts when the part ordering is added to the + SELECT ORDER BY.) 3. **Output file management.** Allow users to break exported parts into smaller Parquet files, which helps ensure good performance @@ -62,9 +84,16 @@ source sort order and cutting out the `SELECT` pipeline. a Hybrid table should be able to read the data back from Iceberg without requiring changes. -5. **Atomic transfer.** Readers should never see a partial export in - the target Iceberg table. Exported part(s) and partition(s) should - be visible in their entirety in Iceberg or not at all. +5. **Atomic transfer.** Readers should never see a partial export. + The mechanism depends on the destination family: + - **Plain object storage.** Each transaction writes a sidecar + commit file that lists every data file produced by the transaction. + Readers that want atomicity filter by commit; a crash before the + commit file lands leaves only orphaned data files. + - **Iceberg destinations.** Atomicity is provided by Iceberg's own + snapshot-commit protocol — the new snapshot either becomes the + current metadata pointer or it does not. A crash before the + pointer swap leaves only orphaned data files and sidecars. 6. **Distributed operation.** - `EXPORT PART` always runs locally on the ClickHouse host where @@ -79,7 +108,6 @@ source sort order and cutting out the `SELECT` pipeline. It must be possible for users to track the following from system tables: - Export part request status. - Export partition request status. - - History of exported parts including whether export succeeded or failed. - Relevant profile events related to export. 8. **Error recovery.** @@ -87,43 +115,54 @@ source sort order and cutting out the `SELECT` pipeline. destination is a no-op while the export is running. (There should be a way to track 'recent' exports so that they are idempotent as well.) - - **Clean-up.** There must be a procedure to clean up a failed - export using only SQL commands. + - **Clean-up.** If file or metadata clean-up is required before resubmitting a failed + export, it must be possible to do so using only SQL commands. - **Automatic restart.** `EXPORT PARTITION` task is persistent and resumes after restarts. 9. **Killable.** It must be possible to terminate any `ALTER TABLE EXPORT` command. - The command should be idempotent and must throw a clear exception on failure - rather than hanging. + The command should be idempotent and must throw a clear exception on failure + rather than hanging. + +### Future requirements + +The design should accomodate the following extensions in the near future. + +- EXPORT PARTITION for MergeTree tables. Must work without Keeper installation. +- Export history. Provide a system table to track the history of part exports. -### Non-requirements +### Out of scope requirements -- Non-Parquet output formats. Only `Parquet` is targeted in this iteration. -- `partition_strategy = 'wildcard'` destinations — only `'hive'` is supported; others throw - `NOT_IMPLEMENTED`. +- Non-Parquet output file formats. Only `Parquet` is targeted in this iteration. Later + iterations may add new output file formats. - Exporting to arbitrary table functions. Only those backed by an object-storage engine that supports exports (e.g. `s3`, `azure`) are valid; others throw `NOT_IMPLEMENTED`. -- Schema evolution between source and destination. Schemas must match (`INCOMPATIBLE_COLUMNS` - otherwise); destination cannot have a column that matches a source `EPHEMERAL`. +- Iceberg schema or partition-spec evolution at commit time. Not supported. The source + `MergeTree` schema and partition keys must be compatible with the destination + Iceberg table's current `schema-id` and `partition-spec-id`. Destination partition values + are derived directly from the source part's partition key; we do not recompute them + from row data. - Any read/query path over exported files — consumption happens via normal `S3` / `s3` / external-engine reads. -- Synchronous exports. Both commands return immediately; completion is polled via system tables. +- Synchronous exports. Not supported. EXPORT commands return immediately to client after + starting the export task; completion is polled via system tables. - Importing parts back from object storage (that is tracked separately). ### Constraints - Experimental gate: `allow_experimental_export_merge_tree_part` (query-level) for `EXPORT PART`; - `enable_experimental_export_merge_tree_partition_feature` (server-level) for `EXPORT PARTITION`. -- `EXPORT PARTITION` requires a ZooKeeper / `clickhouse-keeper` ensemble with the `multi_read` - feature flag. -- Destination table must use `partition_strategy = 'hive'`. + `allow_experimental_export_merge_tree_partition_feature` (server-level) for `EXPORT PARTITION`. +- For best results `EXPORT PARTITION` requires a ZooKeeper / `clickhouse-keeper` ensemble with + the `multi_read` feature flag. This reduces API calls and ensures transactional consistency + when reading multiple fields. + governed by the destination table's Iceberg partition spec instead. - No change to `MergeTree` on-disk part format; only the Keeper schema under the table's replication path is extended. ### References -- `docs/en/engines/table-engines/mergetree-family/part_export.md` -- `docs/en/engines/table-engines/mergetree-family/partition_export.md` +- `docs/en/antalya/part_export.md` +- `docs/en/antalya/partition_export.md` - `tests/queries/0_stateless/03572_export_merge_tree_part_basic.sh` - `tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage_simple.sql` - `tests/queries/0_stateless/03572_export_merge_tree_part_limits_and_table_functions.sh` @@ -140,12 +179,28 @@ source sort order and cutting out the `SELECT` pipeline. ## 2. Functional specification ### User-facing behavior -A user points `ALTER TABLE` at a `MergeTree` source and an object-storage destination (table or -table function). The command returns immediately with no rows. The export runs in the background; -progress lives in `system.exports` and, for partition exports, `system.replicated_partition_exports`. -Successful exports append to `system.part_log` with `event_type = 'ExportPart'`. Writers to -object storage drop one Parquet data file per part (or per chunk when split by size/rows) plus -one commit file per transaction. +A user points `ALTER TABLE` at a `MergeTree` source and a destination (table or table function). +The command returns immediately with no rows. The export runs in the background; progress lives +in `system.exports` and, for partition exports, `system.replicated_partition_exports`. Successful +exports append to `system.part_log` with `event_type = 'ExportPart'`. + +Output shape depends on the destination family: + +- **Plain object storage.** One Parquet data file per part (or per chunk when split by + size/rows) plus one commit file per transaction — `//_..parquet` + and `/commit_<...>`. Readers that want atomicity filter by commit. + +- **Iceberg destination.** One Parquet data file per part (or per chunk) plus a sibling + Avro statistics sidecar `_clickhouse_export_part_sidecar.avro` carrying + `record_count`, `file_size_in_bytes`, `column_sizes`, `null_value_counts`, `lower_bounds`, + and `upper_bounds`. The per-part task does not modify Iceberg metadata. On final commit + the initiating replica reads every sidecar, assembles a new manifest and manifest list, + writes a new `metadata.json`, and atomically swaps the pointer (via the catalog when one + is configured; otherwise via the warehouse `metadata.json` pointer). The manifest summary + contains `clickhouse.export-partition-transaction-id`, checked before every commit attempt + to prevent a double-commit after a post-commit / pre-status-update crash. Sidecar files + are not referenced from any Iceberg manifest and can be deleted safely after the commit + lands; ClickHouse does not reap them. ### SQL syntax / API @@ -168,6 +223,12 @@ ALTER TABLE [db.]table TO TABLE [dest_db.]dest_table [SETTINGS ...]; +-- Export every active part of all partitions (Replicated*MergeTree only) +ALTER TABLE [db.]table + EXPORT PARTITION ALL + TO TABLE [dest_db.]dest_table + [SETTINGS ...]; + -- Cancel one or more partition exports KILL EXPORT PARTITION WHERE ; ``` @@ -191,27 +252,26 @@ ALTER TABLE big EXPORT PART '2025_0_32_3' TO TABLE big_dest SETTINGS allow_experimental_export_merge_tree_part = 1, export_merge_tree_part_max_bytes_per_file = 10000000, output_format_parquet_row_group_size_bytes = 5000000; +-- (See note on settings below. Iceberg table engine now has built-in +-- settings for Parquet files.) -- Partition export across a Replicated cluster ALTER TABLE rmt_table EXPORT PARTITION ID '2020' TO TABLE s3_table; --- Cancel by filter +-- Cancel by filter. The WHERE uses the same filter used to read from `system.replicated_partition_exports`. KILL EXPORT PARTITION WHERE partition_id = '2020' AND source_table = 'rmt_table' AND destination_table = 's3_table'; ``` -### End-to-end example +### End-to-end examples -Create a `ReplicatedMergeTree` source, seed two partitions from -`system.numbers`, create a hive-partitioned S3 destination (the on-disk -shape an external Iceberg catalog such as Glue / REST / Nessie / -Lakekeeper registers as an Iceberg table), export one partition, -and verify: +Two parallel walkthroughs illustrate each destination family. Both +begin from the same `ReplicatedMergeTree` source. ```sql --- 1. Source table: Replicated MergeTree partitioned by year. +-- Source table (shared by both examples). CREATE TABLE events ( id UInt64, @@ -222,7 +282,7 @@ ENGINE = ReplicatedMergeTree('/clickhouse/tables/{database}/events', 'r1') PARTITION BY year ORDER BY (year, id); --- 2. Seed two partitions (2024, 2025) straight from `system.numbers`. +-- Seed two partitions (2024, 2025) straight from `system.numbers`. INSERT INTO events SELECT number AS id, @@ -238,11 +298,18 @@ SELECT 2025 AS year FROM system.numbers LIMIT 1000000; +``` --- 3. Destination: S3 with hive partition layout. This is the on-disk shape that --- an Iceberg catalog registers as a table; the `EXPORT PARTITION` command --- itself does not touch any catalog. -CREATE TABLE events_iceberg +#### Plain object storage (hive layout) + +Export to a hive-partitioned S3 destination. The on-disk shape is what +an external Iceberg catalog (Glue, REST, Nessie, Lakekeeper, ...) would +register as an Iceberg table; the `EXPORT PARTITION` command itself +does not touch any catalog in this mode. + +```sql +-- 1. Destination: S3 with hive partition layout. +CREATE TABLE events_s3 ( id UInt64, ts DateTime, @@ -251,72 +318,180 @@ CREATE TABLE events_iceberg ENGINE = S3(s3_conn, filename='warehouse/events', format = Parquet, partition_strategy = 'hive') PARTITION BY year; --- 4. Export the 2024 partition. Returns immediately; runs in the background. -ALTER TABLE events EXPORT PARTITION ID '2024' TO TABLE events_iceberg; +-- 2. Export the 2024 partition. Returns immediately; runs in the background. +ALTER TABLE events EXPORT PARTITION ID '2024' TO TABLE events_s3; --- 5. Watch progress (Keeper round-trip — use sparingly). +-- 3. Watch progress (Keeper round-trip — use sparingly). SELECT status, parts_count, parts_to_do, last_exception FROM system.replicated_partition_exports WHERE source_table = 'events' AND partition_id = '2024'; --- 6. When status = 'COMPLETED', the destination bucket contains: +-- 4. When status = 'COMPLETED', the destination bucket contains: -- warehouse/events/year=2024/_.1.parquet (one per part) -- warehouse/events/commit_2024_ (atomicity manifest) -- Readers that filter by commit see either the full partition or nothing. -SELECT count() FROM events_iceberg WHERE year = 2024; +SELECT count() FROM events_s3 WHERE year = 2024; --- 7. Or inspect the layout directly. +-- 5. Or inspect the layout directly. SELECT _path FROM s3(s3_conn, filename = 'warehouse/events/year=2024/**', format = 'One') ORDER BY _path; ``` -The same flow works for a non-replicated `MergeTree` source — coordination collapses to the -local node, but the command, destination shape, and observability surfaces are identical. +#### Iceberg destination + +Export directly to an Iceberg table. Unlike the plain-object-storage +flow, `EXPORT PARTITION` writes native Iceberg metadata on commit, so +the result is queryable as an Iceberg table immediately — no external +registration step. The example uses `IcebergS3` without a catalog; +swap in `DatabaseIceberg` or an `iceberg(...)` catalog-backed table +function to route through REST / Glue / Unity. + +```sql +-- 1. Destination: an Iceberg table backed by S3. No catalog required +-- for this form; the warehouse metadata.json pointer is managed +-- by ClickHouse directly. +CREATE TABLE events_iceberg +( + id UInt64, + ts DateTime, + year UInt16 +) +ENGINE = IcebergS3(s3_conn, filename='warehouse/events_iceberg/') +PARTITION BY year; + +-- 2. Export the 2024 partition. Returns immediately; runs in the background. +ALTER TABLE events EXPORT PARTITION ID '2024' TO TABLE events_iceberg; + +-- 3. Watch progress. +SELECT status, parts_count, parts_to_do, last_exception +FROM system.replicated_partition_exports +WHERE source_table = 'events' AND partition_id = '2024'; + +-- 4. When status = 'COMPLETED', the destination contains a fully-formed +-- Iceberg table. Readers see the snapshot atomically — either the +-- full partition or nothing. +SELECT count() FROM events_iceberg WHERE year = 2024; + +-- 5. Object layout: each data file has a sidecar carrying per-file stats +-- used at commit time. These are ClickHouse-private and unreferenced +-- from any Iceberg manifest; safe to delete after COMPLETED. +SELECT _path +FROM s3(s3_conn, filename = 'warehouse/events_iceberg/data/year=2024/**', format = 'One') +ORDER BY _path; +-- warehouse/events_iceberg/data/year=2024/_.1.parquet +-- warehouse/events_iceberg/data/year=2024/_.1.parquet_clickhouse_export_part_sidecar.avro +-- ... +-- warehouse/events_iceberg/metadata/v.metadata.json +-- warehouse/events_iceberg/metadata/snap--*.avro +-- warehouse/events_iceberg/metadata/.avro + +-- 6. The commit carries an idempotency marker in the manifest summary +-- (clickhouse.export-partition-transaction-id). If the initiator +-- crashes post-commit / pre-status-update, a retry sees its own +-- transaction id in the target's latest manifest and skips instead +-- of double-committing. +``` + +The EXPORT PARTITION flow initially only works on ReplicatedMergeTree tables and +requires Keeper. Future iterations will support MergeTree tables as a source. ### Operational notes The following notes expand on expected behavior of commands. -1. `ALTER TABLE t EXPORT PART 'p' TO TABLE s3_t` writes +1. When writing to object storage using `partition_strategy = 'wildcard'`, either wildcard + or 'hive' arguments are permitted. (This setting has no impact on Iceberg, + which records file partitions using metadata.) + +2. By default `ALTER TABLE t EXPORT PART 'p' TO TABLE s3_t` writes `//_.1.parquet` plus - `/commit__`, readable end-to-end via - `SELECT * FROM s3(...)` in tests `03572_*` and `03608_*`. + `/commit__`. You can read this using + `SELECT * FROM s3(...)`. -2. `ALTER TABLE rmt EXPORT PARTITION ID 'p' TO TABLE s3_t` exports +3. `ALTER TABLE rmt EXPORT PARTITION ID 'p' TO TABLE s3_t` exports every active part of partition `p` across all replicas that host it; `system.replicated_partition_exports` converges to `COMPLETED`. -3. Re-issuing the same `EXPORT PARTITION` within +4. Re-issuing the same `EXPORT PARTITION` within `export_merge_tree_partition_manifest_ttl` is a no-op (no - duplicate files) unless `export_merge_tree_partition_force_export = 1`. + duplicate files) unless `export_merge_tree_partition_force_export = 1`. This + behavior avoids accidentally exporting the same data twice. -4. Killing an in-flight partition export via `KILL EXPORT PARTITION` +5. Killing an in-flight partition export via `KILL EXPORT PARTITION` transitions status to `KILLED` and stops all replicas' contributions. -5. Exception during part export is counted in `PartsExportFailures`; - retry behavior honors `export_merge_tree_partition_max_retries`. +6. Exception during part export is counted in `PartsExportFailures`; + retry behavior honors `export_merge_tree_partition_max_retries`. The + same budget also bounds per-task commit retries for Iceberg + destinations; the task fails terminally if commit retries alone + exceed the budget. + +7. A partition export that remains in `PENDING` longer than + `export_merge_tree_partition_task_timeout_seconds` (default 3600s; + `0` disables) is auto-killed by the background cleanup loop and + transitions to `KILLED` with a timeout reason recorded in + `last_exception`. Enforcement is best-effort: actual kill latency is + bounded by one manifest-updater poll cycle (~30s) plus ZooKeeper + watch propagation. This is primarily a backstop against tasks stuck + on missing parts, a missing destination table, or an infinite + commit-retry loop against Iceberg. + +8. **Third-party Iceberg catalog manifest cleanup.** If the catalog + reaps old manifest files, its retention window MUST exceed + `export_merge_tree_partition_task_timeout_seconds`. Otherwise the + rare "sole node commits to Iceberg, crashes before the `COMPLETED` + status reaches Keeper, boots back up after the reaper has deleted + its commit manifest" scenario can produce duplicate data when the + recovered node retries the commit. ClickHouse does not reap its own + export artifacts; the Iceberg sidecars (`*_clickhouse_export_part_sidecar.avro`) + are safe to delete once the commit has landed. + +9. **Settings in `PART` vs `PARTIION` export.** There is a subtle difference in + how settings are handled between PART and PARTITION export. + - For part export, the settings used at the moment of the query are preserved + and re-stored in the background export task. + - For partition export, it is slightly harder to preserve the settings because + we need to serialize them in ZooKeeper. This is done for a few very important + settings including export_merge_tree_part_max_bytes_per_file. This will be + cleaned up in future iterations. ### Settings | Setting | Scope | Default | Range / Values | Applies to | Description | | --- | --- | --- | --- | --- | --- | | `allow_experimental_export_merge_tree_part` | query | `false` | `Bool` | `EXPORT PART` | Experimental gate; required. | -| `enable_experimental_export_merge_tree_partition_feature` | server | `false` | `Bool` | `EXPORT PARTITION` | Experimental gate; required. | +| `allow_experimental_export_merge_tree_partition_feature` | server | `false` | `Bool` | `EXPORT PARTITION` | Experimental gate; required. | | `export_merge_tree_part_overwrite_file_if_exists` | query | `false` | `Bool` | `EXPORT PART` | Overwrite existing destination file; otherwise throws. | -| `export_merge_tree_part_max_bytes_per_file` | query | `0` | `UInt64` (`0`=unlimited) | both | Soft cap per output file. Non-zero values can break idempotency. | +| `export_merge_tree_part_max_bytes_per_file` | query | `0` | `UInt64` (`0`=unlimited) | both | Soft cap per output file. Non-zero values can break idempotency. (SEE NOTE 1 below.)| | `export_merge_tree_part_max_rows_per_file` | query | `0` | `UInt64` (`0`=unlimited) | both | Soft cap per output file. Non-zero values can break idempotency. | | `export_merge_tree_part_throw_on_pending_mutations` | query | `true` | `Bool` | both | Refuse to export parts with pending mutations (unless mutation was `IN PARTITION`). | | `export_merge_tree_part_throw_on_pending_patch_parts` | query | `true` | `Bool` | both | Refuse to export parts with pending patch parts. | | `export_merge_tree_part_filename_pattern` | query | `{part_name}_{checksum}` | `String` | both | Filename template; supports `{part_name}`, `{checksum}`, `{database}`, `{table}`, server macros. | -| `export_merge_tree_partition_force_export` | query | `false` | `Bool` | `EXPORT PARTITION` | Overwrite a live Keeper manifest for the same `(source, destination, partition_id)`. | -| `export_merge_tree_partition_max_retries` | query | `3` | `UInt64` | `EXPORT PARTITION` | Per-part retry budget before the partition export fails. | -| `export_merge_tree_partition_manifest_ttl` | query | `180` (seconds) | `UInt64` | `EXPORT PARTITION` | Live-manifest TTL; acts as the idempotency window. Does not interrupt in-flight tasks. | +| `export_merge_tree_partition_force_export` | query | `false` | `Bool` | `EXPORT PARTITION` | Overwrite a live Keeper manifest for the same `(source, destination, partition_id)`. Dangerous — can produce duplicate data on the destination; use with caution. | +| `export_merge_tree_partition_max_retries` | query | `3` | `UInt64` | `EXPORT PARTITION` | Retry budget applied to both per-part export attempts and per-task commit attempts (Iceberg). The task fails terminally if commit retries alone exceed the budget. | +| `export_merge_tree_partition_manifest_ttl` | query | `180` (seconds) | `UInt64` | `EXPORT PARTITION` | Live-manifest TTL; acts as the idempotency window. Does not interrupt in-flight tasks. Keep this greater than `export_merge_tree_partition_task_timeout_seconds` if you want the `KILLED` entry to remain visible in `system.replicated_partition_exports` after the timeout fires. | +| `export_merge_tree_partition_task_timeout_seconds` | query | `3600` (seconds) | `UInt64` (`0`=disable) | `EXPORT PARTITION` | Wall-clock cap for `PENDING` tasks; on expiry transitions to `KILLED` with a timeout reason. Measured from manifest `create_time`. Enforcement latency ≈ one manifest-updater poll cycle (~30s) plus Keeper watch propagation. | +| `export_merge_tree_partition_system_table_prefer_remote_information` | query | `false` | `Bool` | `EXPORT PARTITION` | When `true`, `system.replicated_partition_exports` fetches fresh state from Keeper (requires the `MULTI_READ` feature flag); when `false`, uses local cached state. **Default flipped from `true` to `false` in this release** — Keeper round-trips were more expensive than warranted for the typical observability workload. | | `export_merge_tree_part_file_already_exists_policy` | query | `skip` | `skip` / `error` / `overwrite` | `EXPORT PARTITION` | Per-file policy during partition export. | -Default-value impact: all new settings default to "off" or to conservative -values (pending-mutation guards default to throwing). No existing query -behavior changes unless a user opts in. +Default-value impact: all new settings default to "off" or to +conservative values (pending-mutation guards default to throwing). One +default *changed* in this release: +`export_merge_tree_partition_system_table_prefer_remote_information` +flipped from `true` to `false`, so `system.replicated_partition_exports` +now serves local cached state by default instead of querying Keeper. +Users who relied on always-fresh results must set it back to `true` +explicitly. + +NOTE 1: EXPORT should also observe the following existing settings for export to Iceberg / Parquet: +- `iceberg_insert_max_bytes_in_data_file` (superseded by `export_merge_tree_part_max_bytes_per_file` if specified) +- `iceberg_insert_max_rows_in_data_file` +- `output_format_parquet_row_group_size` +- `output_format_parquet_row_group_size_bytes` +- `output_format_parquet_data_page_size` +- `output_format_parquet_compression_method` +- `output_format_parquet_version` ### System tables / metrics / log messages / observability @@ -353,8 +528,13 @@ behavior changes unless a user opts in. `FILE_ALREADY_EXISTS` (TBD — confirm). - Duplicate live manifest without `..._force_export`: `DUPLICATE_EXPORT_TASK` or equivalent (TBD — confirm). +- Commit-retry budget exhausted (Iceberg destination): terminal `FAILED` with + `last_exception` populated; the task is not retried further. +- Task timeout exceeded (`export_merge_tree_partition_task_timeout_seconds`): + terminal `KILLED` with a timeout reason in `last_exception`. -All of the above **throw exceptions**; they do not crash the server. +All of the above **throw exceptions** or surface through `last_exception` on the +task row; they do not crash the server. ### Backward compatibility - **Older client → newer server:** harmless — the client issues the new `ALTER` text; the server @@ -368,7 +548,11 @@ All of the above **throw exceptions**; they do not crash the server. all the parts; otherwise the task stalls on `parts_to_do > 0`. An upgrade-ordering note for operators is required (section 4 / Rollout). - **On-disk format:** unchanged. Parts are read as-is; Parquet is produced on the fly. -- **Default-value changes:** none that affect existing workloads (see settings table). +- **Default-value changes:** `export_merge_tree_partition_system_table_prefer_remote_information` + flipped from `true` to `false`. Users running dashboards that read + `system.replicated_partition_exports` and require always-fresh state must set this + back to `true` explicitly (and ensure the Keeper `MULTI_READ` feature flag is enabled). + No other defaults that affect existing workloads (see settings table). --- @@ -388,6 +572,18 @@ Partition coordination: Keeper nodes under `/partition_exports/ manifest (parts list, policy), per-part assignment, per-replica progress, and the kill flag. Each replica watches `partition_exports` and picks up parts it holds locally. +For Iceberg destinations the part-level pipeline writes a Parquet data file plus a sibling +Avro statistics sidecar through `IcebergMetadata::MultipleFileWriter` (extended to compute +per-file stats: `record_count`, `file_size_in_bytes`, `column_sizes`, `null_value_counts`, +Iceberg-serialized lower/upper bounds). The per-part task does not touch any Iceberg +metadata. On the final commit step — once per transaction, on the initiating replica for +partition exports; at end-of-part for single-part exports — the coordinator calls the new +`IDataLakeMetadata::commitExportPartitionTransaction`, which reads every sidecar, +assembles a manifest and manifest list, writes a new `metadata.json`, and CAS-swaps the +catalog pointer (or the warehouse `metadata.json` pointer when catalog-less). The Iceberg +`metadata.json` snapshot the transaction was opened against is stashed in the Keeper +manifest so any surviving replica can complete the commit. + ### Key design decisions 1. **Separate AST nodes for each command.** `EXPORT PART` and `EXPORT PARTITION` get distinct @@ -398,7 +594,11 @@ Each replica watches `partition_exports` and picks up parts it holds locally. 2. **Reuse the existing object-storage sink.** Export rides on the destination engine's existing Parquet writer (`StorageS3` / `StorageAzureBlob` sink). No new encoder. The sink is extended to accept an already-ordered stream and a target filename derived from - `export_merge_tree_part_filename_pattern`. + `export_merge_tree_part_filename_pattern`. Iceberg destinations route through the same + Parquet writer but via `IcebergMetadata::MultipleFileWriter`, which layers per-file + statistics collection on top. Commit is the Iceberg-specific part — it does not reuse the + object-storage sink's commit-file path and goes through + `IDataLakeMetadata::commitExportPartitionTransaction` instead (see decision #9). 3. **Stream parts in primary-key order, no re-`SELECT`.** The per-part reader walks the part in its on-disk order and feeds the Parquet writer directly, skipping the analyzer / planner / @@ -412,10 +612,19 @@ Each replica watches `partition_exports` and picks up parts it holds locally. side-effect whose assignment depends on which replica holds which part, so they warrant their own subtree. -5. **Atomicity via commit files.** Each transaction emits one `commit__` (part - level) or `commit__` (partition level) file that lists every data file - written. Readers wanting atomicity filter by commit; this avoids on-target renames or - multipart-transaction protocols on object storage. +5. **Atomicity depends on destination family.** + - *Plain object storage:* each transaction emits one + `commit__` (part level) or + `commit__` (partition level) file listing + every data file written. Readers wanting atomicity filter by + commit; this avoids on-target renames or multipart-transaction + protocols on object storage. + - *Iceberg destinations:* atomicity is native. The commit writes a + new Iceberg snapshot whose manifest summary embeds + `clickhouse.export-partition-transaction-id`. The pointer swap is + CAS-atomic at the catalog (or warehouse) level, so readers see + the snapshot in its entirety or not at all. The transaction id is + re-read before every commit attempt (see decision #11). 6. **Async model with three observability surfaces.** Commands return immediately. In-flight progress lives in `system.exports` (local, dropped on completion); @@ -431,10 +640,37 @@ Each replica watches `partition_exports` and picks up parts it holds locally. 8. **Two experimental gates, asymmetric scope.** `EXPORT PART` is gated query-level (`allow_experimental_export_merge_tree_part`) — individual users can try it. `EXPORT PARTITION` - is gated server-level (`enable_experimental_export_merge_tree_partition_feature`) because it + is gated server-level (`allow_experimental_export_merge_tree_partition_feature`) because it writes to Keeper and engages cluster coordination — rollout is an operator decision, not a per-query one. +9. **One commit abstraction for data-lake destinations.** `IDataLakeMetadata` gains a + `commitExportPartitionTransaction` virtual (default implementation throws + `NOT_IMPLEMENTED`; `IcebergMetadata` overrides). Callers pass catalog, table id, + transaction id, schema id, partition-spec id, partition values, and the list of data + file paths. Sidecars are discovered by filename convention relative to each data file + path. Future data-lake backends (Delta, Hudi) can override; non-overriding backends + short-circuit to the plain-object-storage commit-file path at the caller. + +10. **Per-file Iceberg statistics live in object-storage sidecars, not in Keeper or memory.** + A partition export is long-running and must survive node restarts. Column stats computed + during write cannot be recovered cheaply at commit time (would require re-reading every + Parquet file), and Keeper is not sized for one-znode-per-file with a stats payload. The + sidecar (`_clickhouse_export_part_sidecar.avro`, schema + `data_file_sidecar_schema` in `AvroSchema.h`) is the persistence mechanism: each + per-part task writes one sidecar next to its data file; the commit step reads all + sidecars in one pass. ClickHouse does not reap sidecars; users may delete them after + `COMPLETED`. + +11. **Two layers of commit idempotency for Iceberg destinations.** Layer 1 (existing, + Keeper): duplicate `(source, destination, partition_id)` submissions are refused while + the manifest is live. Layer 2 (new, Iceberg manifest): the transaction id is written + into every commit as the `clickhouse.export-partition-transaction-id` summary field and + re-checked before the next commit attempt. Layer 1 prevents two concurrent submissions + from racing; Layer 2 protects a single task across its own crash-retry boundary (a + post-commit / pre-Keeper-status-update crash would otherwise double-commit when the + recovered initiator retries). + ### Concurrency / locking - Per-part export holds the part's `DataPartStorage` lock for the duration of the read (same @@ -445,18 +681,37 @@ Each replica watches `partition_exports` and picks up parts it holds locally. `background_pool_size` knob or add a dedicated one — TBD). - Idempotency against duplicate submission is enforced at the Keeper manifest level (unique `(source, destination, partition_id)` while manifest is live). +- Commit-attempt counter for Iceberg destinations is a Keeper znode + (`/commit_attempts`) and shares the `export_merge_tree_partition_max_retries` + budget with per-part retries. Exhausting either path terminates the task. +- Manifest-updater status-drain invariant: the status queue is drained holding the status + lock only, never nested inside the export-partition lock. The earlier nested-lock pattern + caused a race window where a replica could miss status transitions for a sibling task; + this invariant is load-bearing for `system.replicated_partition_exports` freshness and + for `KILL EXPORT PARTITION` propagation. ### Storage format changes - **On-disk parts:** unchanged. -- **Keeper:** adds `/partition_exports/` subtree. Older servers ignore unknown - Keeper children; no schema version bump required but coordinator code MUST be tolerant of - concurrent removal by another version (TBD — verify). -- **Object-storage layout:** `//..` plus +- **Keeper:** adds `/partition_exports/` subtree. The per-task manifest carries + the parts list, policy, per-replica progress, kill flag, `task_timeout_seconds`, + `commit_attempts`, and — for Iceberg destinations — the source `metadata.json` snapshot + the transaction was opened against plus the `write_full_path_in_iceberg_metadata` flag, + so any surviving replica can complete the commit without re-reading catalog state. Older + servers ignore unknown Keeper children; no schema version bump required but coordinator + code MUST be tolerant of concurrent removal by another version (TBD — verify). +- **Object-storage layout (plain OS):** + `//..` plus `/commit_` (part-level) or - `/commit__` (partition-level). Readers that don't understand - commit files will see the data files directly — this is acceptable for non-atomic readers but - callers wanting atomicity MUST filter by commit. + `/commit__` (partition-level). Readers that don't + understand commit files will see the data files directly — callers wanting atomicity + MUST filter by commit. +- **Object-storage layout (Iceberg):** + `/data//..parquet` plus a sibling + `/data//..parquet_clickhouse_export_part_sidecar.avro`, + plus the standard Iceberg `/metadata/v.metadata.json`, + `/metadata/snap-*.avro`, and `/metadata/.avro`. + Sidecars are unreferenced from any Iceberg manifest and are ClickHouse-private. ### Performance @@ -468,6 +723,12 @@ Each replica watches `partition_exports` and picks up parts it holds locally. `export_merge_tree_part_max_bytes_per_file` / `_max_rows_per_file` set. - Benchmark coverage: TBD — propose a `tests/performance/export_merge_tree_part.xml` comparing `EXPORT PART` vs. `INSERT INTO s3_t SELECT FROM mt_t WHERE _part = ...` over a ~1 GB part. +- Commit cost for Iceberg destinations: linear in the number of data files. One sidecar + read + one Avro manifest write + one `metadata.json` write + one catalog CAS. For a + partition with thousands of parts, this is seconds of extra work on top of the per-part + write phase — bounded, not per-row. +- No extra commit cost for plain-object-storage destinations beyond the single commit-file + write. ### Alternatives considered @@ -487,6 +748,12 @@ Each replica watches `partition_exports` and picks up parts it holds locally. 5. **Non-Keeper coordination (leader replica drives everything)** — rejected; would require a new leader-election path and wouldn't survive leader restart without a Keeper-backed manifest anyway. +6. **Per-file Iceberg stats in Keeper** — rejected. One znode per data file with a stats + payload inflates Keeper state by the size of the dataset being exported; Keeper is not + object storage. +7. **Commit each Iceberg part individually as its own snapshot** — rejected. Iceberg + best-practice is one snapshot per logical transaction; per-part snapshots produce + snapshot-log churn and make every reader's planning scan O(parts) instead of O(1). ### Open questions @@ -502,6 +769,21 @@ Each replica watches `partition_exports` and picks up parts it holds locally. is persistent" — clarify recovery semantics. - Is the manifest TTL enforced by the initiator or by a cluster-wide cleanup job? Affects what happens when the initiator is offline. +- `EXPORT PART` currently runs its commit step inline (per-part), whereas `EXPORT PARTITION` + defers commit to the coordinator. The PR author flagged this asymmetry for rethinking — + should single-part export be a degenerate 1-part partition export internally, sharing the + commit primitive? Would simplify `IcebergMetadata` but adds a Keeper dependency to the + part-level path. +- `export_merge_tree_part_max_bytes_per_file` vs. `iceberg_insert_max_bytes_in_data_file`: + remove the former for Iceberg destinations in favor of the Iceberg-native setting, or + keep both for per-destination-family control? +- `IcebergImportSink` is introduced alongside the export path in this PR — is + `INSERT INTO iceberg_t` (regular write, not `ALTER TABLE EXPORT`) in scope for this + design or a sibling one? +- Should non-Iceberg data-lake backends (Delta, Hudi) fall back to the plain-object-storage + commit-file path when they do not override `commitExportPartitionTransaction`, or throw + `NOT_IMPLEMENTED`? Current default throws; falling back would mean exported data is + usable but not registered as a lake-format snapshot. --- @@ -541,29 +823,79 @@ New tests to add: `PartsExportFailures`, `PartsExportDuplicated`, `PartsExportTotalMilliseconds` move as expected. +Note: Iceberg-destination coverage is deliberately in integration (next section), not in +`tests/queries/0_stateless`, because it requires a live warehouse and — for the catalog +path — a REST / Glue fixture. The commit-file test above only exercises plain +object-storage atomicity. + Do not add `no-parallel` to any new test unless explicitly required by shared S3 bucket paths; `03604` currently has the tag and should be re-examined to see whether unique per-run paths remove the need. ### Integration tests — `tests/integration` -Keep: +**Keep (modified in this PR):** - `test_export_merge_tree_part_to_object_storage/` — part export in a multi-node setup. -- `test_export_replicated_mt_partition_to_object_storage/` — partition export across replicas, - including `wait_for_export_status`, retry counting, and replica failure scenarios. - -Add: - -- A case where the initiating replica dies mid-partition-export and a surviving replica must - complete the task (covers the open question above). -- A case where the experimental feature is disabled on one replica - (`disable_experimental_export_partition.xml` config) and enabled on the rest — confirm the - task still completes via the enabled replicas. -- `KILL EXPORT PARTITION` transitions status to `KILLED` and leaves no orphan in-flight writer. -- Mixed-version cluster: upgrade scenario where only some replicas know about `EXPORT PARTITION`. - -Invocation: `python -m ci.praktika run "integration" --test test_export_merge_tree_part_to_object_storage,test_export_replicated_mt_partition_to_object_storage`. + PR 1618 makes minor adjustments. +- `test_export_replicated_mt_partition_to_object_storage/` — partition export across + replicas, including `wait_for_export_status`, retry counting, and replica failure + scenarios. PR 1618 removes the `s3_retries.xml` config and reshapes several test cases + against the new shared helpers. + +**New in PR 1618:** + +- `test_export_merge_tree_part_to_iceberg/` — per-part export to an Iceberg destination, + covering golden path, sidecar emission, manifest shape, and error paths. +- `test_export_replicated_mt_partition_to_iceberg/` — distributed partition export to + Iceberg across replicas, including `test_export_task_timeout_kills_stuck_pending_task` + (uses the `export_partition_commit_always_throw` failpoint to exhaust the commit path, + then asserts the timeout transitions the task to `KILLED`). +- `test_storage_iceberg_with_spark/test_export_partition_iceberg.py` — catalog-less + Iceberg round-trip; Spark reads ClickHouse-written data and verifies schema, partition + layout, and snapshot atomicity. +- `test_storage_iceberg_with_spark/test_export_partition_iceberg_catalog.py` — + catalog-backed (REST) round-trip; exercises the `commitExportPartitionTransaction` + path against a real catalog. +- Shared helpers: + - `tests/integration/helpers/export_partition_helpers.py` — shared + `wait_for_export_status`, manifest-inspection utilities. + - `tests/integration/helpers/iceberg_export_stats.py` — sidecar decoders and stats + assertion helpers. + +**Remaining gaps to add:** + +- Initiating-replica dies mid-commit (post-data-file-write, pre-catalog-CAS) — asserts a + surviving replica completes via the Keeper-stashed `metadata.json` snapshot, and the + `clickhouse.export-partition-transaction-id` idempotency check prevents double-commit. +- Experimental feature disabled on one replica + (`disable_experimental_export_partition.xml` config) and enabled on the rest — task + still completes via the enabled replicas. +- `KILL EXPORT PARTITION` against an Iceberg task in mid-commit: status transitions to + `KILLED`, no dangling half-written `metadata.json`, data files and sidecars remain as + orphans (cleanup is user responsibility per Operational note 7 in § 2). +- Mixed-version cluster: upgrade scenario where only some replicas know about + `EXPORT PARTITION` or the new Keeper manifest fields. + +Invocation: +`python -m ci.praktika run "integration" --test test_export_merge_tree_part_to_object_storage,test_export_replicated_mt_partition_to_object_storage,test_export_merge_tree_part_to_iceberg,test_export_replicated_mt_partition_to_iceberg,test_storage_iceberg_with_spark`. + +### Failpoints + +The following failpoints are registered for deterministic testing of crash windows and +retry logic. Enable via `SYSTEM ENABLE FAILPOINT ` from test harnesses. + +| Failpoint | Kind | What it tests | +| --- | --- | --- | +| `iceberg_writes_non_retry_cleanup` | ONCE | Cleanup path when an Iceberg write fails in a non-retryable way. | +| `iceberg_writes_post_publish_throw` | ONCE | Commit succeeded in object storage but the publish step throws — exercises the recovery path that must not double-commit. | +| `iceberg_export_after_commit_before_zk_completed` | ONCE | Crash window between a successful Iceberg commit and the `COMPLETED` Keeper status update — the idempotency marker (`clickhouse.export-partition-transaction-id`) must prevent a second commit on recovery. | +| `export_partition_commit_always_throw` | REGULAR | Every commit attempt throws — used to exhaust `max_retries` and drive the task-timeout path. | +| `export_partition_status_change_throw` | ONCE | Throws during a manifest status transition — exercises the status-drain lock invariant (decision #11 in § 3) and the manifest-updating task's retry logic. | + +These failpoints replace the "simulate a crash before commit" phrasing in the +functional-tests section above; prefer them over process kills for deterministic CI +behavior. ### Performance tests — `tests/performance` @@ -571,6 +903,12 @@ Add `export_merge_tree_part.xml`: compare `ALTER TABLE ... EXPORT PART` vs. `INSERT INTO s3_t SELECT * FROM mt WHERE _part = ...` on a ~1 GB Wide part; track wall time and peak memory. Hot path is the Parquet encoder, which warrants a guard against regressions. +- Iceberg-destination benchmark: the Iceberg commit is O(data files), not per-row, so the + per-part write path performance should match plain object storage within noise. A + secondary benchmark comparing `EXPORT PARTITION` to Iceberg vs. to hive-layout S3 over + a ≥1000-part partition would catch regressions in the sidecar / manifest-assembly code + path specifically. + ### Manual verification - Roundtrip: export partition → read via `SELECT * FROM s3(...)` → create new @@ -578,19 +916,46 @@ peak memory. Hot path is the Parquet encoder, which warrants a guard against reg - `system.replicated_partition_exports` behaviour under a crashed initiator (cluster restart). - Object-storage layout inspection via `s3(..., format=One)` listing: exactly N data files + 1 commit file per transaction. +- Iceberg roundtrip with an external reader: export a partition to an Iceberg destination + → read it back through the same catalog → row counts and column checksums match the + source. DuckDB is the preferred external reader here (lightweight, fast to stand up, + mature Iceberg support); Spark or Trino may be substituted where a specific catalog + integration needs to be exercised. Confirms the on-disk metadata we write is actually + interoperable, not just self-consistent. ### Rollout / risk -- **Risk:** Keeper schema extension is write-once; a partially-rolled-out cluster where only - some replicas understand the `partition_exports` subtree will stall partition exports - (`parts_to_do > 0`) rather than corrupt data. Acceptable but must be documented in the upgrade - notes. -- **Risk:** object-storage cost / accidental large exports. Mitigated by the experimental gate - (default off) and the manifest idempotency window. +- **Risk (Keeper schema extension):** the `partition_exports` subtree is write-once; a + partially-rolled-out cluster where only some replicas understand the subtree — or the + new manifest fields (`task_timeout_seconds`, `commit_attempts`, Iceberg `metadata.json` + snapshot, `write_full_path_in_iceberg_metadata`) — will stall partition exports + (`parts_to_do > 0`) rather than corrupt data. Acceptable but must be documented in the + upgrade notes. +- **Risk (object-storage cost / accidental large exports):** mitigated by the experimental + gates (default off) and the manifest idempotency window. +- **Risk (Iceberg catalog manifest retention):** if the catalog reaps old manifest files + with a retention window shorter than `export_merge_tree_partition_task_timeout_seconds`, + the rare "sole-node commits, crashes, recovers after reaper deleted the commit manifest" + scenario can duplicate data. Operators MUST verify their catalog's retention before + enabling Iceberg destinations (see Operational note 7 in § 2). +- **Risk (default flip on `_prefer_remote_information`):** dashboards that read + `system.replicated_partition_exports` now see local cached state by default instead of + Keeper-fresh state. Existing users must set the flag back to `true` explicitly if they + rely on always-fresh results (and ensure `MULTI_READ` is enabled). - **Flag strategy:** ship with `allow_experimental_export_merge_tree_part` (query, default - `false`) and `enable_experimental_export_merge_tree_partition_feature` (server, default - `false`). Flip defaults to `true` only after: (a) the open questions above are resolved, (b) - the new functional / integration tests land, (c) one release cycle of customer feedback. -- **Watch in production:** `PartsExportFailures`, `exception_count` on - `system.replicated_partition_exports`, Keeper watch counts under the `partition_exports` - subtree, and object-storage request-error rates. + `false`) and `allow_experimental_export_merge_tree_partition_feature` (server, default + `false`). Both destination families ride on these gates — no separate Iceberg gate. + Flip defaults to `true` only after: (a) the open questions in § 3 are resolved, (b) the + remaining integration gaps listed above are closed, (c) one release cycle of customer + feedback. +- **Watch in production:** + - `PartsExportFailures` (existing). + - `exception_count` on `system.replicated_partition_exports` (existing). + - Keeper watch counts under the `partition_exports` subtree (existing). + - Object-storage request-error rates (existing). + - `commit_attempts` values approaching `max_retries` — signal of a catalog or network + issue throttling commits. + - Rate of `KILLED` transitions with timeout reason — signal of tasks stuck on missing + parts, missing destination, or commit backpressure. + - For Iceberg destinations: `metadata/` prefix write-error rate (CAS contention, + vended-credential expiry) and catalog-API error rate. diff --git a/antalya/skills/antalya-feature-design/SKILL.md b/antalya/skills/antalya-feature-design/SKILL.md index cef1afa1b659..9cc1ef86b7cd 100644 --- a/antalya/skills/antalya-feature-design/SKILL.md +++ b/antalya/skills/antalya-feature-design/SKILL.md @@ -42,6 +42,9 @@ path, then populate each section based on the conversation. Leave a section mark pointed question rather than inventing content — a visible gap is more useful than a plausible fabrication. +The default location for designs is antalya/docs/design. Create a new directory for the design +and place the .md file within it. + ### 3. Conventions to apply while writing These match the project `CLAUDE.md` and make the design consistent with the rest of the codebase: @@ -54,13 +57,18 @@ These match the project `CLAUDE.md` and make the design consistent with the rest - Cite files as `path/to/file.cpp:line` so reviewers can jump directly. - Prefer one-line statements of intent over prose padding. +Avoid long examples or deep specification detail in the design. Where +such detail is require, such as end-to-end examples or output formats, +put these in files of the form `annex-.md` and reference them with +markdown links. + ### 4. Push back on weak spots as you write Drafting is also a review — don't wait for section 4 to think. In particular: - If the **Motivation** reduces to "users want X" with no concrete workload, ask for one. -- If **Goals** are not measurable, say so and suggest a measurable form. -- If there are no **Non-goals**, propose a few. The absence of non-goals is the single biggest +- If **Requirements** are not measurable, say so and suggest a measurable form. +- If there are no **Non-requirements**, propose a few. The absence of non-goals is the single biggest source of scope creep. - If **Alternatives considered** is empty, press for at least one rejected approach. A design with no alternatives considered usually means the author has a solution looking for a problem. @@ -74,12 +82,18 @@ severity: **blocking** (must fix before implementation), **should-address** (fix **nit** (optional). Quote the exact text you're critiquing so the author can find it fast. ### Requirements +- The requirements section define the user problem. The problem should be stated in terms of needed + features, performance goals, delivery deadlines, plus topics we do not need to consider or solve. - Motivation cites a concrete workload, incident, or user report — not a generic assertion. - Requirements are measurable. "Faster" is not a requirement; "query `Q` drops from 5s to <500ms on dataset `D`" is. - Non-requirements are listed. Missing non-requirement almost always produce scope creep in review. +- Requirements should be generic. Put user-visible product behavior such as SQL commands in the Functional + Specification. Put internal (non-visible behavior) in the Implementation section. ### Functional specification +- The functional specification defines user visible product behavior including SQL commands, system tables, + settings, and error messages. - Every new/changed SQL syntax has at least one concrete example. - Every new setting has scope (server / user / query), default, and valid range. - Default-value changes that alter behavior for existing workloads are called out explicitly. @@ -90,6 +104,7 @@ severity: **blocking** (must fix before implementation), **should-address** (fix - Formatting convention: inline code blocks around SQL identifiers, engine names, settings. ### Implementation +- The implementation section covers internal design that is not visible to the user. - Architecture section names the subsystems touched (parser / analyzer / planner / executor / storage / replication / keeper). If it touches many, that's a design smell worth flagging. - New abstractions pull their weight. If an interface has one implementation and no foreseeable @@ -103,15 +118,13 @@ severity: **blocking** (must fix before implementation), **should-address** (fix internal code that already trusts its callers. ### Test plan -- Golden path, edge cases, and error cases are all enumerated — not just "add tests". +- The test plan defines happy path, edge cases, and error cases. +- Test case definitions should be specific, falsifiable propositions about feature behavior. +- Test case definitions should not contain implementation code, except as necessary + to describe the feature being tested. - Tests use `tests/queries/0_stateless` for functional coverage and `tests/integration` for anything touching replication, keeper, distributed queries, S3, auth, or Kafka. -- No `no-*` tags (especially `no-parallel`) unless there's a stated reason they're necessary. -- New tests are proposed as new files, not extensions of existing ones. -- Integration-test invocation is specified: - `python -m ci.praktika run "integration" --test `. -- Performance tests exist if the feature is in the hot path. -- Rollout section names the specific risk and whether a feature flag (and default) is warranted. +- Performance should be specified if the feature is in the hot path, i.e., affects response or resource usage. ### Cross-cutting red flags - Feature flags or backwards-compatibility shims added "just in case" — per project `CLAUDE.md`, diff --git a/antalya/skills/antalya-feature-design/assets/design-template.md b/antalya/skills/antalya-feature-design/assets/design-template.md index 171e0f5c64df..08b228ad9394 100644 --- a/antalya/skills/antalya-feature-design/assets/design-template.md +++ b/antalya/skills/antalya-feature-design/assets/design-template.md @@ -8,10 +8,10 @@ --- -## 1. Requirements +## 1. Problem Definition ### Motivation -What problem does this feature solve? Why now? Quote user reports, incidents, or concrete +What problem does this feature solve for users? Why now? Quote user reports, incidents, or concrete workloads where possible — avoid generic statements like "users want X". ### Requirements From 6257cf9176fb3f5fde8ef1d1b5ee7fbbf7f7e53e Mon Sep 17 00:00:00 2001 From: Robert Hodges Date: Thu, 23 Apr 2026 19:50:34 -0700 Subject: [PATCH 3/5] Fixed typo. Signed-off-by: Robert Hodges --- antalya/docs/design/alter-table-export-part-partition.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/antalya/docs/design/alter-table-export-part-partition.md b/antalya/docs/design/alter-table-export-part-partition.md index 96b6b13ce18f..1dbdceb3bb82 100644 --- a/antalya/docs/design/alter-table-export-part-partition.md +++ b/antalya/docs/design/alter-table-export-part-partition.md @@ -519,8 +519,7 @@ NOTE 1: EXPORT should also observe the following existing settings for export to `src/Common/ErrorCodes.cpp`). - Destination schema mismatch (columns / types / order; source `EPHEMERAL` column present in destination): `INCOMPATIBLE_COLUMNS`. -- Destination engine doesn't support exports (e.g. `url`, non-hive `partition_strategy`, unknown - engine): `NOT_IMPLEMENTED`. +- Destination engine doesn't support exports (e.g. `url`, unknown engine): `NOT_IMPLEMENTED`. - Destination is an unknown table function: `UNKNOWN_FUNCTION`. - Pending mutations or patch parts when the guard is enabled: `BAD_ARGUMENTS` (TBD — confirm). - Part not found on any replica: `NO_SUCH_DATA_PART` (TBD — confirm). From 915df130337cae7e1d539655a322a13273cb3b67 Mon Sep 17 00:00:00 2001 From: Robert Hodges Date: Sat, 25 Apr 2026 10:43:22 -0700 Subject: [PATCH 4/5] Address PR comments and drop Implementation section. Signed-off-by: Robert Hodges --- .../alter-table-export-part-partition.md | 279 +++--------------- .../assets/design-template.md | 8 + 2 files changed, 42 insertions(+), 245 deletions(-) diff --git a/antalya/docs/design/alter-table-export-part-partition.md b/antalya/docs/design/alter-table-export-part-partition.md index 1dbdceb3bb82..2b923064e361 100644 --- a/antalya/docs/design/alter-table-export-part-partition.md +++ b/antalya/docs/design/alter-table-export-part-partition.md @@ -46,7 +46,7 @@ Both commands accept two destination families: 2. **Apache Iceberg tables, with or without a catalog** (`Iceberg*` engines, `iceberg*` table functions, `DatabaseIceberg`). Output is Parquet data files plus per-file Avro *statistics sidecars*; on - commit the initiating replica assembles a new Iceberg manifest, + commit the EXPORT process assembles a new Iceberg manifest, writes a new `metadata.json`, and swaps the catalog pointer (or the warehouse `metadata.json` pointer when catalog-less). Atomicity is native — the snapshot either exists or it does not. @@ -55,9 +55,7 @@ These commands replace `INSERT INTO ... SELECT FROM` pipelines that select rows and write them out to one or more Parquet files. That approach uses resources for sorting, does not coordinate across replicas, and does not take advantage of existing partitioning and sorting in -`MergeTree`. `EXPORT PART` and `EXPORT PARTITION` write parts directly -to the destination from the source replica(s), preserving the source -sort order and cutting out the `SELECT` pipeline. +`MergeTree`. ### Requirements @@ -124,12 +122,16 @@ sort order and cutting out the `SELECT` pipeline. The command should be idempotent and must throw a clear exception on failure rather than hanging. -### Future requirements +### Open questions and future requirements -The design should accomodate the following extensions in the near future. +The design should address the following topics in the near future. - EXPORT PARTITION for MergeTree tables. Must work without Keeper installation. - Export history. Provide a system table to track the history of part exports. +- Flexible casting that addresses issues like the following. + - Handling potentially lossy casts like INSERT SELECT: int64 -> int32. + - Export to tables that are missing columns. + - How to map column names--by position or by name? (e.g, is id, name, age compatible with id, age, name)? ### Out of scope requirements @@ -137,7 +139,7 @@ The design should accomodate the following extensions in the near future. iterations may add new output file formats. - Exporting to arbitrary table functions. Only those backed by an object-storage engine that supports exports (e.g. `s3`, `azure`) are valid; others throw `NOT_IMPLEMENTED`. -- Iceberg schema or partition-spec evolution at commit time. Not supported. The source +- Non-matching Iceberg schema, sorting or partitioning. Not supported. The source `MergeTree` schema and partition keys must be compatible with the destination Iceberg table's current `schema-id` and `partition-spec-id`. Destination partition values are derived directly from the source part's partition key; we do not recompute them @@ -157,7 +159,7 @@ The design should accomodate the following extensions in the near future. when reading multiple fields. governed by the destination table's Iceberg partition spec instead. - No change to `MergeTree` on-disk part format; only the Keeper schema under the table's - replication path is extended. + replication path is extended. The extension is tranparent to users. ### References @@ -188,7 +190,9 @@ Output shape depends on the destination family: - **Plain object storage.** One Parquet data file per part (or per chunk when split by size/rows) plus one commit file per transaction — `//_..parquet` - and `/commit_<...>`. Readers that want atomicity filter by commit. + and `/commit_<...>`. Readers that want atomicity filter by commit. This is the default path, + but it can be customized to handle sharding, which is not covered by the default case. See the + `` - **Iceberg destination.** One Parquet data file per part (or per chunk) plus a sibling Avro statistics sidecar `_clickhouse_export_part_sidecar.avro` carrying @@ -255,7 +259,10 @@ SETTINGS allow_experimental_export_merge_tree_part = 1, -- (See note on settings below. Iceberg table engine now has built-in -- settings for Parquet files.) --- Partition export across a Replicated cluster +-- Partition export across a Replicated cluster. This currently +-- selects the parts on the replica that receives the plan. This +-- means the result may vary if new parts are arriving on other +-- replicas. ALTER TABLE rmt_table EXPORT PARTITION ID '2020' TO TABLE s3_table; -- Cancel by filter. The WHERE uses the same filter used to read from `system.replicated_partition_exports`. @@ -347,6 +354,8 @@ registration step. The example uses `IcebergS3` without a catalog; swap in `DatabaseIceberg` or an `iceberg(...)` catalog-backed table function to route through REST / Glue / Unity. +**Open Issue** We need to specify how to commit via a catalog. + ```sql -- 1. Destination: an Iceberg table backed by S3. No catalog required -- for this form; the warehouse metadata.json pointer is managed @@ -363,7 +372,7 @@ PARTITION BY year; -- 2. Export the 2024 partition. Returns immediately; runs in the background. ALTER TABLE events EXPORT PARTITION ID '2024' TO TABLE events_iceberg; --- 3. Watch progress. +-- 3. Watch progress. You can use the system.exports table for this. SELECT status, parts_count, parts_to_do, last_exception FROM system.replicated_partition_exports WHERE source_table = 'events' AND partition_id = '2024'; @@ -416,7 +425,9 @@ The following notes expand on expected behavior of commands. 4. Re-issuing the same `EXPORT PARTITION` within `export_merge_tree_partition_manifest_ttl` is a no-op (no duplicate files) unless `export_merge_tree_partition_force_export = 1`. This - behavior avoids accidentally exporting the same data twice. + behavior avoids accidentally exporting the same data twice. Note, however + that forcing the operation is dangerous if ClickHouse can't clean up the + previous operation. In this case you'll potentially commit files twice. 5. Killing an in-flight partition export via `KILL EXPORT PARTITION` transitions status to `KILLED` and stops all replicas' contributions. @@ -472,7 +483,7 @@ The following notes expand on expected behavior of commands. | `export_merge_tree_partition_max_retries` | query | `3` | `UInt64` | `EXPORT PARTITION` | Retry budget applied to both per-part export attempts and per-task commit attempts (Iceberg). The task fails terminally if commit retries alone exceed the budget. | | `export_merge_tree_partition_manifest_ttl` | query | `180` (seconds) | `UInt64` | `EXPORT PARTITION` | Live-manifest TTL; acts as the idempotency window. Does not interrupt in-flight tasks. Keep this greater than `export_merge_tree_partition_task_timeout_seconds` if you want the `KILLED` entry to remain visible in `system.replicated_partition_exports` after the timeout fires. | | `export_merge_tree_partition_task_timeout_seconds` | query | `3600` (seconds) | `UInt64` (`0`=disable) | `EXPORT PARTITION` | Wall-clock cap for `PENDING` tasks; on expiry transitions to `KILLED` with a timeout reason. Measured from manifest `create_time`. Enforcement latency ≈ one manifest-updater poll cycle (~30s) plus Keeper watch propagation. | -| `export_merge_tree_partition_system_table_prefer_remote_information` | query | `false` | `Bool` | `EXPORT PARTITION` | When `true`, `system.replicated_partition_exports` fetches fresh state from Keeper (requires the `MULTI_READ` feature flag); when `false`, uses local cached state. **Default flipped from `true` to `false` in this release** — Keeper round-trips were more expensive than warranted for the typical observability workload. | +| `export_merge_tree_partition_system_table_prefer_remote_information` | query | `false` | `Bool` | `EXPORT PARTITION` | When `true`, `system.replicated_partition_exports` fetches fresh state from Keeper (requires the `MULTI_READ` feature flag); when `false`, uses local cached state. **Default flipped from `true` to `false` in this release** — Keeper round-trips were more expensive than warranted for the typical observability workload. (See NOTE 2.)| | `export_merge_tree_part_file_already_exists_policy` | query | `skip` | `skip` / `error` / `overwrite` | `EXPORT PARTITION` | Per-file policy during partition export. | Default-value impact: all new settings default to "off" or to @@ -484,8 +495,11 @@ now serves local cached state by default instead of querying Keeper. Users who relied on always-fresh results must set it back to `true` explicitly. -NOTE 1: EXPORT should also observe the following existing settings for export to Iceberg / Parquet: -- `iceberg_insert_max_bytes_in_data_file` (superseded by `export_merge_tree_part_max_bytes_per_file` if specified) +**NOTE 1:** `export_merge_tree_part_max_bytes_per_file` overrides `iceberg_insert_max_bytes_in_data_file` or +other more specific file size parameters. + +EXPORT should also observe the following existing settings for export to Iceberg / Parquet: +- `iceberg_insert_max_bytes_in_data_file` (as above, overridden by `export_merge_tree_part_max_bytes_per_file` if specified) - `iceberg_insert_max_rows_in_data_file` - `output_format_parquet_row_group_size` - `output_format_parquet_row_group_size_bytes` @@ -493,6 +507,9 @@ NOTE 1: EXPORT should also observe the following existing settings for export to - `output_format_parquet_compression_method` - `output_format_parquet_version` +**NOTE 2:** `export_merge_tree_partition_system_table_prefer_remote_information` may be dropped. +Querying Keeper from the user path is complex and has side effects. + ### System tables / metrics / log messages / observability - `system.exports` — rows for currently-executing part exports (source/destination tables, @@ -557,236 +574,8 @@ task row; they do not crash the server. ## 3. Implementation -### Architecture overview -Parser adds two new `ASTAlterCommand` variants (`EXPORT_PART`, `EXPORT_PARTITION`) plus -`KILL EXPORT PARTITION`. The interpreter side routes `EXPORT PART` into a new per-part export -task scheduled on the background export pool; `EXPORT PARTITION` is routed through Keeper for -coordination and expands into N per-part tasks across the replicas that host each part. - -Part-level pipeline: reuse the existing Parquet output stack (`StorageS3Sink` / Parquet writer) -fed by a source that streams a single part in primary-key order, with no post-read sort or merge -pass. - -Partition coordination: Keeper nodes under `/partition_exports//` hold the -manifest (parts list, policy), per-part assignment, per-replica progress, and the kill flag. -Each replica watches `partition_exports` and picks up parts it holds locally. - -For Iceberg destinations the part-level pipeline writes a Parquet data file plus a sibling -Avro statistics sidecar through `IcebergMetadata::MultipleFileWriter` (extended to compute -per-file stats: `record_count`, `file_size_in_bytes`, `column_sizes`, `null_value_counts`, -Iceberg-serialized lower/upper bounds). The per-part task does not touch any Iceberg -metadata. On the final commit step — once per transaction, on the initiating replica for -partition exports; at end-of-part for single-part exports — the coordinator calls the new -`IDataLakeMetadata::commitExportPartitionTransaction`, which reads every sidecar, -assembles a manifest and manifest list, writes a new `metadata.json`, and CAS-swaps the -catalog pointer (or the warehouse `metadata.json` pointer when catalog-less). The Iceberg -`metadata.json` snapshot the transaction was opened against is stashed in the Keeper -manifest so any surviving replica can complete the commit. - -### Key design decisions - -1. **Separate AST nodes for each command.** `EXPORT PART` and `EXPORT PARTITION` get distinct - `ASTAlterCommand::Type` variants; `KILL EXPORT PARTITION` is its own `ASTKillExportPartitionQuery`. - The part primitive and the cluster-coordinated partition command do materially different work - and should not share a single code path. - -2. **Reuse the existing object-storage sink.** Export rides on the destination engine's existing - Parquet writer (`StorageS3` / `StorageAzureBlob` sink). No new encoder. The sink is extended - to accept an already-ordered stream and a target filename derived from - `export_merge_tree_part_filename_pattern`. Iceberg destinations route through the same - Parquet writer but via `IcebergMetadata::MultipleFileWriter`, which layers per-file - statistics collection on top. Commit is the Iceberg-specific part — it does not reuse the - object-storage sink's commit-file path and goes through - `IDataLakeMetadata::commitExportPartitionTransaction` instead (see decision #9). - -3. **Stream parts in primary-key order, no re-`SELECT`.** The per-part reader walks the part in - its on-disk order and feeds the Parquet writer directly, skipping the analyzer / planner / - executor decode and sort path that `INSERT INTO ... SELECT` would take. This is the central - performance claim. - -4. **Coordinate partition exports via a dedicated Keeper subtree.** New path - `/partition_exports//` holds the manifest, per-part assignment, per-replica - progress, and the kill flag — separate from the replication log. The replication log is for - data mutations that must apply on every replica; partition exports are a distributed - side-effect whose assignment depends on which replica holds which part, so they warrant their - own subtree. - -5. **Atomicity depends on destination family.** - - *Plain object storage:* each transaction emits one - `commit__` (part level) or - `commit__` (partition level) file listing - every data file written. Readers wanting atomicity filter by - commit; this avoids on-target renames or multipart-transaction - protocols on object storage. - - *Iceberg destinations:* atomicity is native. The commit writes a - new Iceberg snapshot whose manifest summary embeds - `clickhouse.export-partition-transaction-id`. The pointer swap is - CAS-atomic at the catalog (or warehouse) level, so readers see - the snapshot in its entirety or not at all. The transaction id is - re-read before every commit attempt (see decision #11). - -6. **Async model with three observability surfaces.** Commands return immediately. In-flight - progress lives in `system.exports` (local, dropped on completion); - `system.replicated_partition_exports` (Keeper-backed — querying is a Keeper round-trip, use - sparingly); and `system.part_log` gains an `ExportPart` `event_type` for completed per-part - exports. Four `ProfileEvents` (`PartsExports`, `PartsExportFailures`, `PartsExportDuplicated`, - `PartsExportTotalMilliseconds`) expose aggregate counters. - -7. **Idempotency enforced in Keeper.** Duplicate `(source, destination, partition_id)` submissions - are refused while the manifest is live. The manifest TTL - (`export_merge_tree_partition_manifest_ttl`, default 180s) defines the idempotency window; it - does NOT terminate in-flight tasks. `export_merge_tree_partition_force_export = 1` overrides. - -8. **Two experimental gates, asymmetric scope.** `EXPORT PART` is gated query-level - (`allow_experimental_export_merge_tree_part`) — individual users can try it. `EXPORT PARTITION` - is gated server-level (`allow_experimental_export_merge_tree_partition_feature`) because it - writes to Keeper and engages cluster coordination — rollout is an operator decision, not a - per-query one. - -9. **One commit abstraction for data-lake destinations.** `IDataLakeMetadata` gains a - `commitExportPartitionTransaction` virtual (default implementation throws - `NOT_IMPLEMENTED`; `IcebergMetadata` overrides). Callers pass catalog, table id, - transaction id, schema id, partition-spec id, partition values, and the list of data - file paths. Sidecars are discovered by filename convention relative to each data file - path. Future data-lake backends (Delta, Hudi) can override; non-overriding backends - short-circuit to the plain-object-storage commit-file path at the caller. - -10. **Per-file Iceberg statistics live in object-storage sidecars, not in Keeper or memory.** - A partition export is long-running and must survive node restarts. Column stats computed - during write cannot be recovered cheaply at commit time (would require re-reading every - Parquet file), and Keeper is not sized for one-znode-per-file with a stats payload. The - sidecar (`_clickhouse_export_part_sidecar.avro`, schema - `data_file_sidecar_schema` in `AvroSchema.h`) is the persistence mechanism: each - per-part task writes one sidecar next to its data file; the commit step reads all - sidecars in one pass. ClickHouse does not reap sidecars; users may delete them after - `COMPLETED`. - -11. **Two layers of commit idempotency for Iceberg destinations.** Layer 1 (existing, - Keeper): duplicate `(source, destination, partition_id)` submissions are refused while - the manifest is live. Layer 2 (new, Iceberg manifest): the transaction id is written - into every commit as the `clickhouse.export-partition-transaction-id` summary field and - re-checked before the next commit attempt. Layer 1 prevents two concurrent submissions - from racing; Layer 2 protects a single task across its own crash-retry boundary (a - post-commit / pre-Keeper-status-update crash would otherwise double-commit when the - recovered initiator retries). - -### Concurrency / locking - -- Per-part export holds the part's `DataPartStorage` lock for the duration of the read (same - guarantees as a merge/mutation read). -- Partition-export coordinator uses Keeper multi-transactions to (a) claim a part, (b) record - progress, (c) decrement `parts_to_do`, (d) transition status. -- No server-wide lock. Background export pool size is bounded (reuse the existing - `background_pool_size` knob or add a dedicated one — TBD). -- Idempotency against duplicate submission is enforced at the Keeper manifest level (unique - `(source, destination, partition_id)` while manifest is live). -- Commit-attempt counter for Iceberg destinations is a Keeper znode - (`/commit_attempts`) and shares the `export_merge_tree_partition_max_retries` - budget with per-part retries. Exhausting either path terminates the task. -- Manifest-updater status-drain invariant: the status queue is drained holding the status - lock only, never nested inside the export-partition lock. The earlier nested-lock pattern - caused a race window where a replica could miss status transitions for a sibling task; - this invariant is load-bearing for `system.replicated_partition_exports` freshness and - for `KILL EXPORT PARTITION` propagation. - -### Storage format changes - -- **On-disk parts:** unchanged. -- **Keeper:** adds `/partition_exports/` subtree. The per-task manifest carries - the parts list, policy, per-replica progress, kill flag, `task_timeout_seconds`, - `commit_attempts`, and — for Iceberg destinations — the source `metadata.json` snapshot - the transaction was opened against plus the `write_full_path_in_iceberg_metadata` flag, - so any surviving replica can complete the commit without re-reading catalog state. Older - servers ignore unknown Keeper children; no schema version bump required but coordinator - code MUST be tolerant of concurrent removal by another version (TBD — verify). -- **Object-storage layout (plain OS):** - `//..` plus - `/commit_` (part-level) or - `/commit__` (partition-level). Readers that don't - understand commit files will see the data files directly — callers wanting atomicity - MUST filter by commit. -- **Object-storage layout (Iceberg):** - `/data//..parquet` plus a sibling - `/data//..parquet_clickhouse_export_part_sidecar.avro`, - plus the standard Iceberg `/metadata/v.metadata.json`, - `/metadata/snap-*.avro`, and `/metadata/.avro`. - Sidecars are unreferenced from any Iceberg manifest and are ClickHouse-private. - -### Performance - -- Hot path: Parquet encoding of a single part. No extra `SELECT` / sort / merge pass vs. - `INSERT ... SELECT` baseline — that is the expected win. -- Memory: one Parquet writer per concurrent export; row-group buffering bounded by - `output_format_parquet_row_group_size_bytes`. -- I/O: one network write stream per output file; chunked when - `export_merge_tree_part_max_bytes_per_file` / `_max_rows_per_file` set. -- Benchmark coverage: TBD — propose a `tests/performance/export_merge_tree_part.xml` comparing - `EXPORT PART` vs. `INSERT INTO s3_t SELECT FROM mt_t WHERE _part = ...` over a ~1 GB part. -- Commit cost for Iceberg destinations: linear in the number of data files. One sidecar - read + one Avro manifest write + one `metadata.json` write + one catalog CAS. For a - partition with thousands of parts, this is seconds of extra work on top of the per-part - write phase — bounded, not per-row. -- No extra commit cost for plain-object-storage destinations beyond the single commit-file - write. - -### Alternatives considered - -1. `INSERT INTO s3_t SELECT FROM mt_t WHERE _part = 'p'` — today's workaround. Rejected - because it runs the full `SELECT` pipeline (decode, potential re-sort, distribute) per export, - has no cross-replica coordination, and no native commit-file atomicity. -2. **Synchronous `ALTER ... EXPORT PART` that blocks the client** — rejected; partition exports - can run for hours and the HTTP / native session would time out. Async + system tables mirrors - `ALTER ... MUTATE` and is already familiar. -3. **Non-replicated `EXPORT PARTITION` (per-replica, uncoordinated)** — rejected because - duplicates and split-brain are the default outcome when every replica independently exports - the parts it holds. -4. **Queue the partition export in the existing replication log** — rejected; the replication - log is for *data* mutations that must apply on every replica, whereas partition exports are - a distributed *side-effect* whose assignment depends on which replica holds which part. - Separate Keeper subtree is cleaner. -5. **Non-Keeper coordination (leader replica drives everything)** — rejected; would require a - new leader-election path and wouldn't survive leader restart without a Keeper-backed manifest - anyway. -6. **Per-file Iceberg stats in Keeper** — rejected. One znode per data file with a stats - payload inflates Keeper state by the size of the dataset being exported; Keeper is not - object storage. -7. **Commit each Iceberg part individually as its own snapshot** — rejected. Iceberg - best-practice is one snapshot per logical transaction; per-part snapshots produce - snapshot-log churn and make every reader's planning scan O(parts) instead of O(1). - -### Open questions - -- Exact error codes for each failure class above — confirm names in - `src/Common/ErrorCodes.cpp` during prototype. -- Whether `EXPORT PART` should refuse to run against `Replicated*MergeTree` (forcing users to - `EXPORT PARTITION`) or remain allowed as the primitive it clearly is. Current tests allow - both; this should be documented explicitly. -- Dedicated background pool for exports vs. reuse of existing `background_move_pool_size` / - similar — TBD. -- Behaviour of `EXPORT PARTITION` when initiating replica dies mid-task: the manifest persists - in Keeper, but does a surviving replica take over as "source replica"? Current docs say "task - is persistent" — clarify recovery semantics. -- Is the manifest TTL enforced by the initiator or by a cluster-wide cleanup job? Affects what - happens when the initiator is offline. -- `EXPORT PART` currently runs its commit step inline (per-part), whereas `EXPORT PARTITION` - defers commit to the coordinator. The PR author flagged this asymmetry for rethinking — - should single-part export be a degenerate 1-part partition export internally, sharing the - commit primitive? Would simplify `IcebergMetadata` but adds a Keeper dependency to the - part-level path. -- `export_merge_tree_part_max_bytes_per_file` vs. `iceberg_insert_max_bytes_in_data_file`: - remove the former for Iceberg destinations in favor of the Iceberg-native setting, or - keep both for per-destination-family control? -- `IcebergImportSink` is introduced alongside the export path in this PR — is - `INSERT INTO iceberg_t` (regular write, not `ALTER TABLE EXPORT`) in scope for this - design or a sibling one? -- Should non-Iceberg data-lake backends (Delta, Hudi) fall back to the plain-object-storage - commit-file path when they do not override `commitExportPartitionTransaction`, or throw - `NOT_IMPLEMENTED`? Current default throws; falling back would mean exported data is - usable but not registered as a lake-format snapshot. - ---- - -## 4. Test plan +This design only covers user visible behavior. It does not internal implementatation +details. The implementation section is omitted. ### Functional tests — `tests/queries/0_stateless` diff --git a/antalya/skills/antalya-feature-design/assets/design-template.md b/antalya/skills/antalya-feature-design/assets/design-template.md index 08b228ad9394..2f35e9a6af2b 100644 --- a/antalya/skills/antalya-feature-design/assets/design-template.md +++ b/antalya/skills/antalya-feature-design/assets/design-template.md @@ -79,6 +79,14 @@ Highlight compatibility issues, for example: ## 3. Implementation +This section may be omitted by the user. In this case the section should left empty +and replaced with a messsage like the following. + +"The specification only covers user-visible behavior, hence this section is omitted." + +Optionally, also include references to other documents, GitHub issues, or code if +available and relevant. + ### Architecture overview One paragraph, plus a diagram if it helps. Where does the feature live in the codebase, and how does it connect to existing subsystems (parser → analyzer → planner → executor → storage)? From ac2ebc02812132d2bc6d4287b0405153b79533df Mon Sep 17 00:00:00 2001 From: Robert Hodges Date: Sat, 25 Apr 2026 21:45:57 -0700 Subject: [PATCH 5/5] Fixed typo in design doc Signed-off-by: Robert Hodges --- antalya/docs/design/alter-table-export-part-partition.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/antalya/docs/design/alter-table-export-part-partition.md b/antalya/docs/design/alter-table-export-part-partition.md index 2b923064e361..d513e4b80e41 100644 --- a/antalya/docs/design/alter-table-export-part-partition.md +++ b/antalya/docs/design/alter-table-export-part-partition.md @@ -577,6 +577,8 @@ task row; they do not crash the server. This design only covers user visible behavior. It does not internal implementatation details. The implementation section is omitted. +## 4. Test plan + ### Functional tests — `tests/queries/0_stateless` Existing coverage to retain: