diff --git a/crates/integrations/datafusion/src/system_tables/mod.rs b/crates/integrations/datafusion/src/system_tables/mod.rs index 96b038b3..c0064de1 100644 --- a/crates/integrations/datafusion/src/system_tables/mod.rs +++ b/crates/integrations/datafusion/src/system_tables/mod.rs @@ -32,6 +32,7 @@ use crate::error::to_datafusion_error; mod options; mod schemas; mod snapshots; +mod tags; type Builder = fn(Table) -> DFResult>; @@ -39,6 +40,7 @@ const TABLES: &[(&str, Builder)] = &[ ("options", options::build), ("schemas", schemas::build), ("snapshots", snapshots::build), + ("tags", tags::build), ]; /// Parse a Paimon object name into `(base_table, optional system_table_name)`. @@ -126,6 +128,9 @@ mod tests { assert!(is_registered("schemas")); assert!(is_registered("Schemas")); assert!(is_registered("SCHEMAS")); + assert!(is_registered("tags")); + assert!(is_registered("Tags")); + assert!(is_registered("TAGS")); assert!(!is_registered("nonsense")); } diff --git a/crates/integrations/datafusion/src/system_tables/tags.rs b/crates/integrations/datafusion/src/system_tables/tags.rs new file mode 100644 index 00000000..1559b675 --- /dev/null +++ b/crates/integrations/datafusion/src/system_tables/tags.rs @@ -0,0 +1,134 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Mirrors Java [TagsTable](https://github.com/apache/paimon/blob/release-1.3/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java). + +use std::any::Any; +use std::sync::{Arc, OnceLock}; + +use async_trait::async_trait; +use datafusion::arrow::array::{ + new_null_array, Int64Array, RecordBatch, StringArray, TimestampMillisecondArray, +}; +use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit}; +use datafusion::catalog::Session; +use datafusion::datasource::memory::MemorySourceConfig; +use datafusion::datasource::{TableProvider, TableType}; +use datafusion::error::Result as DFResult; +use datafusion::logical_expr::Expr; +use datafusion::physical_plan::ExecutionPlan; +use paimon::table::{Table, TagManager}; + +use crate::error::to_datafusion_error; + +pub(super) fn build(table: Table) -> DFResult> { + Ok(Arc::new(TagsTable { table })) +} + +fn tags_schema() -> SchemaRef { + static SCHEMA: OnceLock = OnceLock::new(); + SCHEMA + .get_or_init(|| { + Arc::new(Schema::new(vec![ + Field::new("tag_name", DataType::Utf8, false), + Field::new("snapshot_id", DataType::Int64, false), + Field::new("schema_id", DataType::Int64, false), + Field::new( + "commit_time", + DataType::Timestamp(TimeUnit::Millisecond, None), + false, + ), + Field::new("record_count", DataType::Int64, true), + Field::new( + "create_time", + DataType::Timestamp(TimeUnit::Millisecond, None), + true, + ), + Field::new("time_retained", DataType::Utf8, true), + ])) + }) + .clone() +} + +#[derive(Debug)] +struct TagsTable { + table: Table, +} + +#[async_trait] +impl TableProvider for TagsTable { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + tags_schema() + } + + fn table_type(&self) -> TableType { + TableType::View + } + + async fn scan( + &self, + _state: &dyn Session, + projection: Option<&Vec>, + _filters: &[Expr], + _limit: Option, + ) -> DFResult> { + let tm = TagManager::new( + self.table.file_io().clone(), + self.table.location().to_string(), + ); + let tags = tm.list_all().await.map_err(to_datafusion_error)?; + + let n = tags.len(); + let mut tag_names: Vec = Vec::with_capacity(n); + let mut snapshot_ids = Vec::with_capacity(n); + let mut schema_ids = Vec::with_capacity(n); + let mut commit_times = Vec::with_capacity(n); + let mut record_counts: Vec> = Vec::with_capacity(n); + + for (name, snap) in tags { + tag_names.push(name); + snapshot_ids.push(snap.id()); + schema_ids.push(snap.schema_id()); + commit_times.push(snap.time_millis() as i64); + record_counts.push(snap.total_record_count()); + } + + let schema = tags_schema(); + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(StringArray::from(tag_names)), + Arc::new(Int64Array::from(snapshot_ids)), + Arc::new(Int64Array::from(schema_ids)), + Arc::new(TimestampMillisecondArray::from(commit_times)), + Arc::new(Int64Array::from(record_counts)), + new_null_array(&DataType::Timestamp(TimeUnit::Millisecond, None), n), + new_null_array(&DataType::Utf8, n), + ], + )?; + + Ok(MemorySourceConfig::try_new_exec( + &[vec![batch]], + schema, + projection.cloned(), + )?) + } +} diff --git a/crates/integrations/datafusion/tests/system_tables.rs b/crates/integrations/datafusion/tests/system_tables.rs index f9c3378c..b5c23d9d 100644 --- a/crates/integrations/datafusion/tests/system_tables.rs +++ b/crates/integrations/datafusion/tests/system_tables.rs @@ -331,3 +331,85 @@ async fn test_snapshots_system_table() { "unexpected commit_kind: {kind}" ); } + +#[tokio::test] +async fn test_tags_system_table_empty_when_no_tag_dir() { + let (ctx, _catalog, _tmp) = create_context().await; + let sql = format!("SELECT * FROM paimon.default.{FIXTURE_TABLE}$tags"); + let batches = run_sql(&ctx, &sql).await; + + // Schema must be present even with zero rows. + assert!(!batches.is_empty(), "$tags should return ≥1 batch"); + let arrow_schema = batches[0].schema(); + let expected_columns = [ + ("tag_name", DataType::Utf8), + ("snapshot_id", DataType::Int64), + ("schema_id", DataType::Int64), + ( + "commit_time", + DataType::Timestamp(TimeUnit::Millisecond, None), + ), + ("record_count", DataType::Int64), + ( + "create_time", + DataType::Timestamp(TimeUnit::Millisecond, None), + ), + ("time_retained", DataType::Utf8), + ]; + for (i, (name, dtype)) in expected_columns.iter().enumerate() { + let field = arrow_schema.field(i); + assert_eq!(field.name(), name, "column {i} name"); + assert_eq!(field.data_type(), dtype, "column {i} type"); + } + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total_rows, 0, "fixture has no tag dir, expected 0 rows"); +} + +#[tokio::test] +async fn test_tags_system_table_with_seeded_tags() { + let (ctx, catalog, tmp) = create_context().await; + + let identifier = Identifier::new("default".to_string(), FIXTURE_TABLE.to_string()); + let table = catalog.get_table(&identifier).await.unwrap(); + let sm = + paimon::table::SnapshotManager::new(table.file_io().clone(), table.location().to_string()); + let earliest = sm.list_all().await.unwrap().into_iter().next().unwrap(); + + let table_dir = tmp.path().join("default.db").join(FIXTURE_TABLE); + let tag_dir = table_dir.join("tag"); + std::fs::create_dir_all(&tag_dir).expect("create tag dir"); + let src = table_dir + .join("snapshot") + .join(format!("snapshot-{}", earliest.id())); + std::fs::copy(&src, tag_dir.join("tag-v1")).unwrap(); + std::fs::copy(&src, tag_dir.join("tag-v2")).unwrap(); + + let sql = format!("SELECT tag_name, snapshot_id FROM paimon.default.{FIXTURE_TABLE}$tags"); + let batches = run_sql(&ctx, &sql).await; + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total_rows, 2, "expected two seeded tags"); + + let mut names: Vec = Vec::new(); + let mut snap_ids: Vec = Vec::new(); + for batch in &batches { + let names_col = batch + .column(0) + .as_any() + .downcast_ref::() + .expect("tag_name is Utf8"); + let snap_col = batch + .column(1) + .as_any() + .downcast_ref::() + .expect("snapshot_id is Int64"); + for i in 0..batch.num_rows() { + names.push(names_col.value(i).to_string()); + snap_ids.push(snap_col.value(i)); + } + } + let mut sorted_names = names.clone(); + sorted_names.sort(); + assert_eq!(names, sorted_names, "tag_name should be ascending"); + assert_eq!(names, vec!["v1".to_string(), "v2".to_string()]); + assert_eq!(snap_ids, vec![earliest.id(), earliest.id()]); +} diff --git a/crates/paimon/src/table/tag_manager.rs b/crates/paimon/src/table/tag_manager.rs index a907a8a9..6c38499f 100644 --- a/crates/paimon/src/table/tag_manager.rs +++ b/crates/paimon/src/table/tag_manager.rs @@ -22,6 +22,8 @@ use crate::io::FileIO; use crate::spec::Snapshot; +use futures::future::try_join_all; +use opendal::raw::get_basename; const TAG_DIR: &str = "tag"; const TAG_PREFIX: &str = "tag-"; @@ -86,4 +88,114 @@ impl TagManager { })?; Ok(Some(snapshot)) } + + /// List all tag names sorted ascending. Returns an empty vector when the + /// tag directory does not exist. + pub async fn list_all_names(&self) -> crate::Result> { + let tag_dir = self.tag_directory(); + let statuses = match self.file_io.list_status(&tag_dir).await { + Ok(s) => s, + Err(crate::Error::IoUnexpected { ref source, .. }) + if source.kind() == opendal::ErrorKind::NotFound => + { + return Ok(Vec::new()); + } + Err(e) => return Err(e), + }; + let mut names: Vec = statuses + .into_iter() + .filter(|s| !s.is_dir) + .filter_map(|s| { + get_basename(&s.path) + .strip_prefix(TAG_PREFIX) + .map(str::to_string) + }) + .collect(); + names.sort_unstable(); + Ok(names) + } + + /// List all tags as `(name, snapshot)` pairs sorted by name ascending. + pub async fn list_all(&self) -> crate::Result> { + let names = self.list_all_names().await?; + try_join_all(names.into_iter().map(|name| async move { + let snap = self + .get(&name) + .await? + .ok_or_else(|| crate::Error::DataInvalid { + message: format!("tag '{name}' disappeared during listing"), + source: None, + })?; + Ok::<_, crate::Error>((name, snap)) + })) + .await + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::io::FileIOBuilder; + use crate::spec::CommitKind; + use bytes::Bytes; + + fn test_file_io() -> FileIO { + FileIOBuilder::new("memory").build().unwrap() + } + + fn test_snapshot(id: i64) -> Snapshot { + Snapshot::builder() + .version(3) + .id(id) + .schema_id(0) + .base_manifest_list("base-list".to_string()) + .delta_manifest_list("delta-list".to_string()) + .commit_user("test-user".to_string()) + .commit_identifier(0) + .commit_kind(CommitKind::APPEND) + .time_millis(1000 * id as u64) + .build() + } + + async fn write_tag(file_io: &FileIO, tm: &TagManager, name: &str, snapshot: &Snapshot) { + let path = tm.tag_path(name); + let json = serde_json::to_string(snapshot).unwrap(); + let output = file_io.new_output(&path).unwrap(); + output.write(Bytes::from(json)).await.unwrap(); + } + + #[tokio::test] + async fn test_list_all_names_missing_dir_returns_empty() { + let file_io = test_file_io(); + let tm = TagManager::new(file_io, "memory:/test_tag_missing".to_string()); + assert!(tm.list_all_names().await.unwrap().is_empty()); + assert!(tm.list_all().await.unwrap().is_empty()); + } + + #[tokio::test] + async fn test_list_all_names_sorted() { + let file_io = test_file_io(); + let table_path = "memory:/test_tag_sorted".to_string(); + file_io.mkdirs(&format!("{table_path}/tag/")).await.unwrap(); + let tm = TagManager::new(file_io.clone(), table_path); + for name in ["v3", "v1", "v2"] { + write_tag(&file_io, &tm, name, &test_snapshot(1)).await; + } + assert_eq!(tm.list_all_names().await.unwrap(), vec!["v1", "v2", "v3"]); + } + + #[tokio::test] + async fn test_list_all_loads_pairs() { + let file_io = test_file_io(); + let table_path = "memory:/test_tag_pairs".to_string(); + file_io.mkdirs(&format!("{table_path}/tag/")).await.unwrap(); + let tm = TagManager::new(file_io.clone(), table_path); + write_tag(&file_io, &tm, "a", &test_snapshot(1)).await; + write_tag(&file_io, &tm, "b", &test_snapshot(2)).await; + let pairs = tm.list_all().await.unwrap(); + let names: Vec<&str> = pairs.iter().map(|(n, _)| n.as_str()).collect(); + let ids: Vec = pairs.iter().map(|(_, s)| s.id()).collect(); + assert_eq!(names, vec!["a", "b"]); + assert_eq!(ids, vec![1, 2]); + } }