Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions crates/integrations/datafusion/src/system_tables/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,15 @@ use crate::error::to_datafusion_error;
mod options;
mod schemas;
mod snapshots;
mod tags;

type Builder = fn(Table) -> DFResult<Arc<dyn TableProvider>>;

const TABLES: &[(&str, Builder)] = &[
("options", options::build),
("schemas", schemas::build),
("snapshots", snapshots::build),
("tags", tags::build),
];

/// Parse a Paimon object name into `(base_table, optional system_table_name)`.
Expand Down Expand Up @@ -126,6 +128,9 @@ mod tests {
assert!(is_registered("schemas"));
assert!(is_registered("Schemas"));
assert!(is_registered("SCHEMAS"));
assert!(is_registered("tags"));
assert!(is_registered("Tags"));
assert!(is_registered("TAGS"));
assert!(!is_registered("nonsense"));
}

Expand Down
134 changes: 134 additions & 0 deletions crates/integrations/datafusion/src/system_tables/tags.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Mirrors Java [TagsTable](https://github.com/apache/paimon/blob/release-1.3/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java).

use std::any::Any;
use std::sync::{Arc, OnceLock};

use async_trait::async_trait;
use datafusion::arrow::array::{
new_null_array, Int64Array, RecordBatch, StringArray, TimestampMillisecondArray,
};
use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit};
use datafusion::catalog::Session;
use datafusion::datasource::memory::MemorySourceConfig;
use datafusion::datasource::{TableProvider, TableType};
use datafusion::error::Result as DFResult;
use datafusion::logical_expr::Expr;
use datafusion::physical_plan::ExecutionPlan;
use paimon::table::{Table, TagManager};

use crate::error::to_datafusion_error;

pub(super) fn build(table: Table) -> DFResult<Arc<dyn TableProvider>> {
Ok(Arc::new(TagsTable { table }))
}

fn tags_schema() -> SchemaRef {
static SCHEMA: OnceLock<SchemaRef> = OnceLock::new();
SCHEMA
.get_or_init(|| {
Arc::new(Schema::new(vec![
Field::new("tag_name", DataType::Utf8, false),
Field::new("snapshot_id", DataType::Int64, false),
Field::new("schema_id", DataType::Int64, false),
Field::new(
"commit_time",
DataType::Timestamp(TimeUnit::Millisecond, None),
false,
),
Field::new("record_count", DataType::Int64, true),
Field::new(
"create_time",
DataType::Timestamp(TimeUnit::Millisecond, None),
true,
),
Field::new("time_retained", DataType::Utf8, true),
]))
})
.clone()
}

#[derive(Debug)]
struct TagsTable {
table: Table,
}

#[async_trait]
impl TableProvider for TagsTable {
fn as_any(&self) -> &dyn Any {
self
}

fn schema(&self) -> SchemaRef {
tags_schema()
}

fn table_type(&self) -> TableType {
TableType::View
}

async fn scan(
&self,
_state: &dyn Session,
projection: Option<&Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
) -> DFResult<Arc<dyn ExecutionPlan>> {
let tm = TagManager::new(
self.table.file_io().clone(),
self.table.location().to_string(),
);
let tags = tm.list_all().await.map_err(to_datafusion_error)?;

let n = tags.len();
let mut tag_names: Vec<String> = Vec::with_capacity(n);
let mut snapshot_ids = Vec::with_capacity(n);
let mut schema_ids = Vec::with_capacity(n);
let mut commit_times = Vec::with_capacity(n);
let mut record_counts: Vec<Option<i64>> = Vec::with_capacity(n);

for (name, snap) in tags {
tag_names.push(name);
snapshot_ids.push(snap.id());
schema_ids.push(snap.schema_id());
commit_times.push(snap.time_millis() as i64);
record_counts.push(snap.total_record_count());
}

let schema = tags_schema();
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(StringArray::from(tag_names)),
Arc::new(Int64Array::from(snapshot_ids)),
Arc::new(Int64Array::from(schema_ids)),
Arc::new(TimestampMillisecondArray::from(commit_times)),
Arc::new(Int64Array::from(record_counts)),
new_null_array(&DataType::Timestamp(TimeUnit::Millisecond, None), n),
new_null_array(&DataType::Utf8, n),
],
)?;

Ok(MemorySourceConfig::try_new_exec(
&[vec![batch]],
schema,
projection.cloned(),
)?)
}
}
82 changes: 82 additions & 0 deletions crates/integrations/datafusion/tests/system_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,3 +331,85 @@ async fn test_snapshots_system_table() {
"unexpected commit_kind: {kind}"
);
}

#[tokio::test]
async fn test_tags_system_table_empty_when_no_tag_dir() {
let (ctx, _catalog, _tmp) = create_context().await;
let sql = format!("SELECT * FROM paimon.default.{FIXTURE_TABLE}$tags");
let batches = run_sql(&ctx, &sql).await;

// Schema must be present even with zero rows.
assert!(!batches.is_empty(), "$tags should return ≥1 batch");
let arrow_schema = batches[0].schema();
let expected_columns = [
("tag_name", DataType::Utf8),
("snapshot_id", DataType::Int64),
("schema_id", DataType::Int64),
(
"commit_time",
DataType::Timestamp(TimeUnit::Millisecond, None),
),
("record_count", DataType::Int64),
(
"create_time",
DataType::Timestamp(TimeUnit::Millisecond, None),
),
("time_retained", DataType::Utf8),
];
for (i, (name, dtype)) in expected_columns.iter().enumerate() {
let field = arrow_schema.field(i);
assert_eq!(field.name(), name, "column {i} name");
assert_eq!(field.data_type(), dtype, "column {i} type");
}
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
assert_eq!(total_rows, 0, "fixture has no tag dir, expected 0 rows");
}

#[tokio::test]
async fn test_tags_system_table_with_seeded_tags() {
let (ctx, catalog, tmp) = create_context().await;

let identifier = Identifier::new("default".to_string(), FIXTURE_TABLE.to_string());
let table = catalog.get_table(&identifier).await.unwrap();
let sm =
paimon::table::SnapshotManager::new(table.file_io().clone(), table.location().to_string());
let earliest = sm.list_all().await.unwrap().into_iter().next().unwrap();

let table_dir = tmp.path().join("default.db").join(FIXTURE_TABLE);
let tag_dir = table_dir.join("tag");
std::fs::create_dir_all(&tag_dir).expect("create tag dir");
let src = table_dir
.join("snapshot")
.join(format!("snapshot-{}", earliest.id()));
std::fs::copy(&src, tag_dir.join("tag-v1")).unwrap();
std::fs::copy(&src, tag_dir.join("tag-v2")).unwrap();

let sql = format!("SELECT tag_name, snapshot_id FROM paimon.default.{FIXTURE_TABLE}$tags");
let batches = run_sql(&ctx, &sql).await;
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
assert_eq!(total_rows, 2, "expected two seeded tags");

let mut names: Vec<String> = Vec::new();
let mut snap_ids: Vec<i64> = Vec::new();
for batch in &batches {
let names_col = batch
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.expect("tag_name is Utf8");
let snap_col = batch
.column(1)
.as_any()
.downcast_ref::<Int64Array>()
.expect("snapshot_id is Int64");
for i in 0..batch.num_rows() {
names.push(names_col.value(i).to_string());
snap_ids.push(snap_col.value(i));
}
}
let mut sorted_names = names.clone();
sorted_names.sort();
assert_eq!(names, sorted_names, "tag_name should be ascending");
assert_eq!(names, vec!["v1".to_string(), "v2".to_string()]);
assert_eq!(snap_ids, vec![earliest.id(), earliest.id()]);
}
112 changes: 112 additions & 0 deletions crates/paimon/src/table/tag_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

use crate::io::FileIO;
use crate::spec::Snapshot;
use futures::future::try_join_all;
use opendal::raw::get_basename;

const TAG_DIR: &str = "tag";
const TAG_PREFIX: &str = "tag-";
Expand Down Expand Up @@ -86,4 +88,114 @@ impl TagManager {
})?;
Ok(Some(snapshot))
}

/// List all tag names sorted ascending. Returns an empty vector when the
/// tag directory does not exist.
pub async fn list_all_names(&self) -> crate::Result<Vec<String>> {
let tag_dir = self.tag_directory();
let statuses = match self.file_io.list_status(&tag_dir).await {
Ok(s) => s,
Err(crate::Error::IoUnexpected { ref source, .. })
if source.kind() == opendal::ErrorKind::NotFound =>
{
return Ok(Vec::new());
}
Err(e) => return Err(e),
};
let mut names: Vec<String> = statuses
.into_iter()
.filter(|s| !s.is_dir)
.filter_map(|s| {
get_basename(&s.path)
.strip_prefix(TAG_PREFIX)
.map(str::to_string)
})
.collect();
names.sort_unstable();
Ok(names)
}

/// List all tags as `(name, snapshot)` pairs sorted by name ascending.
pub async fn list_all(&self) -> crate::Result<Vec<(String, Snapshot)>> {
let names = self.list_all_names().await?;
try_join_all(names.into_iter().map(|name| async move {
let snap = self
.get(&name)
.await?
.ok_or_else(|| crate::Error::DataInvalid {
message: format!("tag '{name}' disappeared during listing"),
source: None,
})?;
Ok::<_, crate::Error>((name, snap))
}))
.await
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::io::FileIOBuilder;
use crate::spec::CommitKind;
use bytes::Bytes;

fn test_file_io() -> FileIO {
FileIOBuilder::new("memory").build().unwrap()
}

fn test_snapshot(id: i64) -> Snapshot {
Snapshot::builder()
.version(3)
.id(id)
.schema_id(0)
.base_manifest_list("base-list".to_string())
.delta_manifest_list("delta-list".to_string())
.commit_user("test-user".to_string())
.commit_identifier(0)
.commit_kind(CommitKind::APPEND)
.time_millis(1000 * id as u64)
.build()
}

async fn write_tag(file_io: &FileIO, tm: &TagManager, name: &str, snapshot: &Snapshot) {
let path = tm.tag_path(name);
let json = serde_json::to_string(snapshot).unwrap();
let output = file_io.new_output(&path).unwrap();
output.write(Bytes::from(json)).await.unwrap();
}

#[tokio::test]
async fn test_list_all_names_missing_dir_returns_empty() {
let file_io = test_file_io();
let tm = TagManager::new(file_io, "memory:/test_tag_missing".to_string());
assert!(tm.list_all_names().await.unwrap().is_empty());
assert!(tm.list_all().await.unwrap().is_empty());
}

#[tokio::test]
async fn test_list_all_names_sorted() {
let file_io = test_file_io();
let table_path = "memory:/test_tag_sorted".to_string();
file_io.mkdirs(&format!("{table_path}/tag/")).await.unwrap();
let tm = TagManager::new(file_io.clone(), table_path);
for name in ["v3", "v1", "v2"] {
write_tag(&file_io, &tm, name, &test_snapshot(1)).await;
}
assert_eq!(tm.list_all_names().await.unwrap(), vec!["v1", "v2", "v3"]);
}

#[tokio::test]
async fn test_list_all_loads_pairs() {
let file_io = test_file_io();
let table_path = "memory:/test_tag_pairs".to_string();
file_io.mkdirs(&format!("{table_path}/tag/")).await.unwrap();
let tm = TagManager::new(file_io.clone(), table_path);
write_tag(&file_io, &tm, "a", &test_snapshot(1)).await;
write_tag(&file_io, &tm, "b", &test_snapshot(2)).await;
let pairs = tm.list_all().await.unwrap();
let names: Vec<&str> = pairs.iter().map(|(n, _)| n.as_str()).collect();
let ids: Vec<i64> = pairs.iter().map(|(_, s)| s.id()).collect();
assert_eq!(names, vec!["a", "b"]);
assert_eq!(ids, vec![1, 2]);
}
}
Loading