From 08b4576094b51f8bcf650b4052c1be793691d608 Mon Sep 17 00:00:00 2001 From: JingsongLi Date: Thu, 23 Apr 2026 17:56:54 +0800 Subject: [PATCH] feat: add custom Avro OCF reader for manifest parsing with filtered decoding Replace apache-avro's read path (Value intermediate representation) with a zero-copy custom decoder that reads Avro binary directly into target structs. Key optimizations: - Custom AvroCursor for zero-copy Avro binary primitive decoding - OCF parser with snappy/deflate/zstd decompression support - Writer schema parsing with field index mapping for compatibility - Two-pass filtered ManifestEntry decoding: lightweight fields first, skip expensive DataFileMeta for filtered-out entries - SharedSchemaCache for cross-task schema reuse - Zero-copy entry consumption (into_identifier, into_parts) - Parallel base+delta manifest list reads via futures::try_join! - Remove redundant exists() checks before file reads --- crates/paimon/Cargo.toml | 1 + crates/paimon/src/btree/block.rs | 6 +- crates/paimon/src/spec/avro/cursor.rs | 338 +++++++++++++++ crates/paimon/src/spec/avro/decode.rs | 37 ++ crates/paimon/src/spec/avro/decode_helpers.rs | 78 ++++ .../spec/avro/index_manifest_entry_decode.rs | 202 +++++++++ .../src/spec/avro/manifest_entry_decode.rs | 410 ++++++++++++++++++ .../spec/avro/manifest_file_meta_decode.rs | 139 ++++++ crates/paimon/src/spec/avro/mod.rs | 204 +++++++++ crates/paimon/src/spec/avro/ocf.rs | 299 +++++++++++++ crates/paimon/src/spec/avro/schema.rs | 401 +++++++++++++++++ crates/paimon/src/spec/index_manifest.rs | 5 +- crates/paimon/src/spec/manifest.rs | 25 +- crates/paimon/src/spec/manifest_common.rs | 2 +- crates/paimon/src/spec/manifest_entry.rs | 28 +- crates/paimon/src/spec/manifest_file_meta.rs | 21 + crates/paimon/src/spec/manifest_list.rs | 11 +- crates/paimon/src/spec/mod.rs | 1 + crates/paimon/src/table/bucket_filter.rs | 30 +- .../paimon/src/table/data_evolution_reader.rs | 2 +- crates/paimon/src/table/partition_filter.rs | 35 +- crates/paimon/src/table/stats_filter.rs | 45 +- crates/paimon/src/table/table_commit.rs | 44 +- crates/paimon/src/table/table_scan.rs | 122 +++--- 24 files changed, 2348 insertions(+), 138 deletions(-) create mode 100644 crates/paimon/src/spec/avro/cursor.rs create mode 100644 crates/paimon/src/spec/avro/decode.rs create mode 100644 crates/paimon/src/spec/avro/decode_helpers.rs create mode 100644 crates/paimon/src/spec/avro/index_manifest_entry_decode.rs create mode 100644 crates/paimon/src/spec/avro/manifest_entry_decode.rs create mode 100644 crates/paimon/src/spec/avro/manifest_file_meta_decode.rs create mode 100644 crates/paimon/src/spec/avro/mod.rs create mode 100644 crates/paimon/src/spec/avro/ocf.rs create mode 100644 crates/paimon/src/spec/avro/schema.rs diff --git a/crates/paimon/Cargo.toml b/crates/paimon/Cargo.toml index 6643e3ba..e63b4c6e 100644 --- a/crates/paimon/Cargo.toml +++ b/crates/paimon/Cargo.toml @@ -61,6 +61,7 @@ indexmap = "2.5.0" roaring = "0.11" crc32fast = "1" zstd = "0.13" +snap = "1" arrow-array = { workspace = true } arrow-buffer = { workspace = true } arrow-cast = { workspace = true } diff --git a/crates/paimon/src/btree/block.rs b/crates/paimon/src/btree/block.rs index d3a75a8f..ab2fa7e8 100644 --- a/crates/paimon/src/btree/block.rs +++ b/crates/paimon/src/btree/block.rs @@ -327,11 +327,7 @@ impl BlockReader { BlockAlignedType::Aligned => { let record_size = int_value as usize; let data_len = block.len() - 5; - let record_count = if record_size > 0 { - data_len / record_size - } else { - 0 - }; + let record_count = data_len.checked_div(record_size).unwrap_or(0); block.truncate(data_len); Ok(Self { data: block, diff --git a/crates/paimon/src/spec/avro/cursor.rs b/crates/paimon/src/spec/avro/cursor.rs new file mode 100644 index 00000000..7da394c0 --- /dev/null +++ b/crates/paimon/src/spec/avro/cursor.rs @@ -0,0 +1,338 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::Error; + +/// Zero-copy cursor over Avro binary-encoded data. +pub struct AvroCursor<'a> { + data: &'a [u8], + pos: usize, +} + +impl<'a> AvroCursor<'a> { + #[inline] + pub fn new(data: &'a [u8]) -> Self { + Self { data, pos: 0 } + } + + #[inline] + #[allow(dead_code)] + pub fn position(&self) -> usize { + self.pos + } + + #[inline] + pub fn remaining(&self) -> usize { + self.data.len() - self.pos + } + + #[inline] + fn require(&self, n: usize) -> crate::Result<()> { + if self.pos + n > self.data.len() { + return Err(Error::UnexpectedError { + message: format!( + "avro cursor: need {} bytes at offset {}, but only {} remain", + n, + self.pos, + self.remaining() + ), + source: None, + }); + } + Ok(()) + } + + /// Read a zigzag-encoded variable-length long (Avro int and long encoding). + #[inline] + pub fn read_long(&mut self) -> crate::Result { + let raw = if self.data.len() - self.pos >= 10 { + self.read_varint_fast() + } else { + self.read_varint_slow() + }?; + Ok(((raw >> 1) as i64) ^ -((raw & 1) as i64)) + } + + /// Fast path: no per-byte bounds check (caller guarantees >= 10 bytes remain). + #[inline] + fn read_varint_fast(&mut self) -> crate::Result { + let buf = &self.data[self.pos..]; + let mut raw: u64 = 0; + let mut shift: u32 = 0; + let mut i = 0; + loop { + let b = buf[i] as u64; + i += 1; + raw |= (b & 0x7F) << shift; + if b & 0x80 == 0 { + self.pos += i; + return Ok(raw); + } + shift += 7; + if shift >= 64 { + return Err(Error::UnexpectedError { + message: "avro cursor: varint overflow".into(), + source: None, + }); + } + } + } + + /// Slow path: bounds check each byte (< 10 bytes remaining). + #[inline] + fn read_varint_slow(&mut self) -> crate::Result { + let mut raw: u64 = 0; + let mut shift: u32 = 0; + loop { + self.require(1)?; + let b = self.data[self.pos] as u64; + self.pos += 1; + raw |= (b & 0x7F) << shift; + if b & 0x80 == 0 { + return Ok(raw); + } + shift += 7; + if shift >= 64 { + return Err(Error::UnexpectedError { + message: "avro cursor: varint overflow".into(), + source: None, + }); + } + } + } + + #[inline] + pub fn read_int(&mut self) -> crate::Result { + let raw = if self.data.len() - self.pos >= 5 { + self.read_varint_int_fast() + } else { + self.read_varint_slow() + }?; + let zigzag = ((raw >> 1) as i64) ^ -((raw & 1) as i64); + Ok(zigzag as i32) + } + + /// Fast path for int: no per-byte bounds check (caller guarantees >= 5 bytes remain). + #[inline] + fn read_varint_int_fast(&mut self) -> crate::Result { + let buf = &self.data[self.pos..]; + let mut raw: u64 = 0; + let mut shift: u32 = 0; + let mut i = 0; + loop { + let b = buf[i] as u64; + i += 1; + raw |= (b & 0x7F) << shift; + if b & 0x80 == 0 { + self.pos += i; + return Ok(raw); + } + shift += 7; + if shift >= 35 { + return Err(Error::UnexpectedError { + message: "avro cursor: int varint overflow".into(), + source: None, + }); + } + } + } + + #[inline] + #[allow(dead_code)] + pub fn read_boolean(&mut self) -> crate::Result { + self.require(1)?; + let b = self.data[self.pos]; + self.pos += 1; + Ok(b != 0) + } + + #[inline] + #[allow(dead_code)] + pub fn read_float(&mut self) -> crate::Result { + self.require(4)?; + let bytes: [u8; 4] = self.data[self.pos..self.pos + 4].try_into().unwrap(); + self.pos += 4; + Ok(f32::from_le_bytes(bytes)) + } + + #[inline] + #[allow(dead_code)] + pub fn read_double(&mut self) -> crate::Result { + self.require(8)?; + let bytes: [u8; 8] = self.data[self.pos..self.pos + 8].try_into().unwrap(); + self.pos += 8; + Ok(f64::from_le_bytes(bytes)) + } + + /// Read Avro bytes: length-prefixed raw bytes, zero-copy. + #[inline] + pub fn read_bytes(&mut self) -> crate::Result<&'a [u8]> { + let raw_len = self.read_long()?; + if raw_len < 0 { + return Err(Error::UnexpectedError { + message: format!("avro cursor: negative bytes length: {raw_len}"), + source: None, + }); + } + let len = raw_len as usize; + self.require(len)?; + let slice = &self.data[self.pos..self.pos + len]; + self.pos += len; + Ok(slice) + } + + /// Read Avro string: length-prefixed UTF-8, zero-copy. + #[inline] + pub fn read_string(&mut self) -> crate::Result<&'a str> { + let bytes = self.read_bytes()?; + std::str::from_utf8(bytes).map_err(|e| Error::UnexpectedError { + message: format!("avro cursor: invalid UTF-8 string: {e}"), + source: None, + }) + } + + /// Read a union index (same encoding as long). + #[inline] + pub fn read_union_index(&mut self) -> crate::Result { + self.read_long() + } + + /// Skip `n` raw bytes. + #[inline] + pub fn skip_raw(&mut self, n: usize) -> crate::Result<()> { + self.require(n)?; + self.pos += n; + Ok(()) + } + + /// Skip an Avro bytes or string value. + #[inline] + pub fn skip_bytes(&mut self) -> crate::Result<()> { + let raw_len = self.read_long()?; + if raw_len < 0 { + return Err(Error::UnexpectedError { + message: format!("avro cursor: negative bytes length: {raw_len}"), + source: None, + }); + } + self.skip_raw(raw_len as usize) + } + + /// Skip an Avro long/int value. + #[inline] + pub fn skip_long(&mut self) -> crate::Result<()> { + self.read_long().map(|_| ()) + } + + /// Read Avro fixed bytes of known length, zero-copy. + #[inline] + pub fn read_fixed(&mut self, len: usize) -> crate::Result<&'a [u8]> { + self.require(len)?; + let slice = &self.data[self.pos..self.pos + len]; + self.pos += len; + Ok(slice) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn zigzag_encode(n: i64) -> Vec { + let mut encoded: u64 = ((n << 1) ^ (n >> 63)) as u64; + let mut buf = Vec::new(); + loop { + if encoded & !0x7F == 0 { + buf.push(encoded as u8); + break; + } + buf.push((encoded & 0x7F | 0x80) as u8); + encoded >>= 7; + } + buf + } + + #[test] + fn test_read_long_values() { + for val in [0i64, 1, -1, 42, -42, 127, -128, i64::MAX, i64::MIN] { + let bytes = zigzag_encode(val); + let mut cursor = AvroCursor::new(&bytes); + assert_eq!(cursor.read_long().unwrap(), val, "failed for {val}"); + assert_eq!(cursor.remaining(), 0); + } + } + + #[test] + fn test_read_int() { + let bytes = zigzag_encode(100); + let mut cursor = AvroCursor::new(&bytes); + assert_eq!(cursor.read_int().unwrap(), 100); + } + + #[test] + fn test_read_boolean() { + let mut cursor = AvroCursor::new(&[0, 1]); + assert!(!cursor.read_boolean().unwrap()); + assert!(cursor.read_boolean().unwrap()); + } + + #[test] + fn test_read_float_double() { + let f_bytes = std::f32::consts::PI.to_le_bytes(); + let d_bytes = std::f64::consts::E.to_le_bytes(); + let mut data = Vec::new(); + data.extend_from_slice(&f_bytes); + data.extend_from_slice(&d_bytes); + + let mut cursor = AvroCursor::new(&data); + assert!((cursor.read_float().unwrap() - std::f32::consts::PI).abs() < 1e-6); + assert!((cursor.read_double().unwrap() - std::f64::consts::E).abs() < 1e-10); + } + + #[test] + fn test_read_bytes_and_string() { + let mut data = Vec::new(); + // bytes: length=5, content="hello" + data.extend_from_slice(&zigzag_encode(5)); + data.extend_from_slice(b"hello"); + // string: length=5, content="world" + data.extend_from_slice(&zigzag_encode(5)); + data.extend_from_slice(b"world"); + + let mut cursor = AvroCursor::new(&data); + assert_eq!(cursor.read_bytes().unwrap(), b"hello"); + assert_eq!(cursor.read_string().unwrap(), "world"); + } + + #[test] + fn test_skip() { + let mut data = Vec::new(); + data.extend_from_slice(&zigzag_encode(3)); + data.extend_from_slice(b"abc"); + data.extend_from_slice(&zigzag_encode(99)); + + let mut cursor = AvroCursor::new(&data); + cursor.skip_bytes().unwrap(); + assert_eq!(cursor.read_int().unwrap(), 99); + } + + #[test] + fn test_eof_error() { + let mut cursor = AvroCursor::new(&[]); + assert!(cursor.read_long().is_err()); + } +} diff --git a/crates/paimon/src/spec/avro/decode.rs b/crates/paimon/src/spec/avro/decode.rs new file mode 100644 index 00000000..f4f5cff6 --- /dev/null +++ b/crates/paimon/src/spec/avro/decode.rs @@ -0,0 +1,37 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use super::cursor::AvroCursor; +use super::schema::WriterSchema; + +/// Trait for types that can be decoded directly from Avro binary data. +pub trait AvroRecordDecode: Sized { + fn decode(cursor: &mut AvroCursor, writer_schema: &WriterSchema) -> crate::Result; +} + +/// Safely negate a negative Avro block count to usize. +/// Avro uses negative counts to indicate that a block-size-in-bytes follows. +#[inline] +pub(crate) fn neg_count_to_usize(count: i64) -> crate::Result { + count + .checked_neg() + .map(|v| v as usize) + .ok_or_else(|| crate::Error::UnexpectedError { + message: format!("avro decode: block count overflow: {count}"), + source: None, + }) +} diff --git a/crates/paimon/src/spec/avro/decode_helpers.rs b/crates/paimon/src/spec/avro/decode_helpers.rs new file mode 100644 index 00000000..b36aec15 --- /dev/null +++ b/crates/paimon/src/spec/avro/decode_helpers.rs @@ -0,0 +1,78 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use super::cursor::AvroCursor; +use super::schema::{FieldSchema, WriterSchema}; + +/// Extract the record WriterSchema from a field schema. +/// Nullable unions are already unwrapped at parse time, so this only matches direct records. +pub(crate) fn extract_record_schema(schema: &FieldSchema) -> Option<&WriterSchema> { + match schema { + FieldSchema::Record(ws) => Some(ws), + _ => None, + } +} + +pub(crate) fn read_int_field(cursor: &mut AvroCursor, nullable: bool) -> crate::Result { + if nullable { + let idx = cursor.read_union_index()?; + if idx == 0 { + return Ok(0); + } + } + cursor.read_int() +} + +pub(crate) fn read_long_field(cursor: &mut AvroCursor, nullable: bool) -> crate::Result { + if nullable { + let idx = cursor.read_union_index()?; + if idx == 0 { + return Ok(0); + } + } + cursor.read_long() +} + +pub(crate) fn read_bytes_field(cursor: &mut AvroCursor, nullable: bool) -> crate::Result> { + if nullable { + let idx = cursor.read_union_index()?; + if idx == 0 { + return Ok(vec![]); + } + } + Ok(cursor.read_bytes()?.to_vec()) +} + +pub(crate) fn read_string_field(cursor: &mut AvroCursor, nullable: bool) -> crate::Result { + if nullable { + let idx = cursor.read_union_index()?; + if idx == 0 { + return Ok(String::new()); + } + } + Ok(cursor.read_string()?.to_string()) +} + +const EMPTY_PARTITION: [u8; 4] = [0, 0, 0, 0]; + +/// Null/missing/empty partition → valid empty BinaryRow (arity=0). +pub(crate) fn normalize_partition(partition: Option>) -> Vec { + match partition { + Some(p) if p.len() >= 4 => p, + _ => EMPTY_PARTITION.to_vec(), + } +} diff --git a/crates/paimon/src/spec/avro/index_manifest_entry_decode.rs b/crates/paimon/src/spec/avro/index_manifest_entry_decode.rs new file mode 100644 index 00000000..b35453f0 --- /dev/null +++ b/crates/paimon/src/spec/avro/index_manifest_entry_decode.rs @@ -0,0 +1,202 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use super::cursor::AvroCursor; +use super::decode::{neg_count_to_usize, AvroRecordDecode}; +use super::decode_helpers::{ + normalize_partition, read_bytes_field, read_int_field, read_long_field, read_string_field, +}; +use super::schema::{skip_nullable_field, WriterSchema}; +use crate::spec::index_manifest::IndexManifestEntry; +use crate::spec::manifest_common::FileKind; +use crate::spec::{DeletionVectorMeta, GlobalIndexMeta, IndexFileMeta}; +use indexmap::IndexMap; + +impl AvroRecordDecode for IndexManifestEntry { + fn decode(cursor: &mut AvroCursor, writer_schema: &WriterSchema) -> crate::Result { + let mut version: Option = None; + let mut kind: Option = None; + let mut partition: Option> = None; + let mut bucket: Option = None; + let mut index_type: Option = None; + let mut file_name: Option = None; + let mut file_size: Option = None; + let mut row_count: Option = None; + let mut deletion_vectors_ranges: Option> = None; + let mut global_index_meta: Option = None; + + for field in &writer_schema.fields { + match field.name.as_str() { + "_VERSION" => version = Some(read_int_field(cursor, field.nullable)?), + "_KIND" => { + let v = read_int_field(cursor, field.nullable)?; + kind = Some(match v { + 0 => FileKind::Add, + 1 => FileKind::Delete, + _ => { + return Err(crate::Error::UnexpectedError { + message: format!("unknown FileKind: {v}"), + source: None, + }) + } + }); + } + "_PARTITION" => partition = Some(read_bytes_field(cursor, field.nullable)?), + "_BUCKET" => bucket = Some(read_int_field(cursor, field.nullable)?), + "_INDEX_TYPE" => index_type = Some(read_string_field(cursor, field.nullable)?), + "_FILE_NAME" => file_name = Some(read_string_field(cursor, field.nullable)?), + "_FILE_SIZE" => file_size = Some(read_long_field(cursor, field.nullable)? as i32), + "_ROW_COUNT" => row_count = Some(read_long_field(cursor, field.nullable)? as i32), + "_DELETIONS_VECTORS_RANGES" | "_DELETION_VECTORS_RANGES" => { + deletion_vectors_ranges = decode_nullable_dv_ranges(cursor, field.nullable)?; + } + "_GLOBAL_INDEX" => { + global_index_meta = decode_nullable_global_index(cursor, field.nullable)?; + } + _ => skip_nullable_field(cursor, &field.schema, field.nullable)?, + } + } + + Ok(IndexManifestEntry { + version: version.unwrap_or(1), + kind: kind.unwrap_or(FileKind::Add), + partition: normalize_partition(partition), + bucket: bucket.unwrap_or(0), + index_file: IndexFileMeta { + index_type: index_type.unwrap_or_default(), + file_name: file_name.unwrap_or_default(), + file_size: file_size.unwrap_or(0), + row_count: row_count.unwrap_or(0), + deletion_vectors_ranges, + global_index_meta, + }, + }) + } +} + +fn decode_nullable_dv_ranges( + cursor: &mut AvroCursor, + nullable: bool, +) -> crate::Result>> { + if nullable { + let idx = cursor.read_union_index()?; + if idx == 0 { + return Ok(None); + } + } + // Array of nullable records + let mut map = IndexMap::new(); + loop { + let count = cursor.read_long()?; + if count == 0 { + break; + } + let count = if count < 0 { + cursor.skip_long()?; + neg_count_to_usize(count)? + } else { + count as usize + }; + for _ in 0..count { + // Each item is union ["null", record] + let item_idx = cursor.read_union_index()?; + if item_idx == 0 { + continue; + } + // Record fields: f0 (string), f1 (int), f2 (int), _CARDINALITY (nullable long) + let f0 = cursor.read_string()?.to_string(); + let f1 = cursor.read_int()?; + let f2 = cursor.read_int()?; + let cardinality = { + let c_idx = cursor.read_union_index()?; + if c_idx == 0 { + None + } else { + Some(cursor.read_long()?) + } + }; + map.insert( + f0, + DeletionVectorMeta { + offset: f1, + length: f2, + cardinality, + }, + ); + } + } + Ok(Some(map)) +} + +fn decode_nullable_global_index( + cursor: &mut AvroCursor, + nullable: bool, +) -> crate::Result> { + if nullable { + let idx = cursor.read_union_index()?; + if idx == 0 { + return Ok(None); + } + } + let row_range_start = cursor.read_long()?; + let row_range_end = cursor.read_long()?; + let index_field_id = cursor.read_int()?; + + // _EXTRA_FIELD_IDS: nullable array of int + let extra_field_ids = { + let u_idx = cursor.read_union_index()?; + if u_idx == 0 { + None + } else { + let mut ids = Vec::new(); + loop { + let count = cursor.read_long()?; + if count == 0 { + break; + } + let count = if count < 0 { + cursor.skip_long()?; + neg_count_to_usize(count)? + } else { + count as usize + }; + for _ in 0..count { + ids.push(cursor.read_int()?); + } + } + Some(ids) + } + }; + + // _INDEX_META: nullable bytes + let index_meta = { + let u_idx = cursor.read_union_index()?; + if u_idx == 0 { + None + } else { + Some(cursor.read_bytes()?.to_vec()) + } + }; + + Ok(Some(GlobalIndexMeta { + row_range_start, + row_range_end, + index_field_id, + extra_field_ids, + index_meta, + })) +} diff --git a/crates/paimon/src/spec/avro/manifest_entry_decode.rs b/crates/paimon/src/spec/avro/manifest_entry_decode.rs new file mode 100644 index 00000000..1d3ffcfa --- /dev/null +++ b/crates/paimon/src/spec/avro/manifest_entry_decode.rs @@ -0,0 +1,410 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use super::cursor::AvroCursor; +use super::decode::{neg_count_to_usize, AvroRecordDecode}; +use super::decode_helpers::{ + extract_record_schema, normalize_partition, read_bytes_field, read_int_field, read_long_field, + read_string_field, +}; +use super::manifest_file_meta_decode::decode_nullable_binary_table_stats; +use super::schema::{skip_nullable_field, FieldSchema, WriterSchema}; +use crate::spec::manifest_common::FileKind; +use crate::spec::stats::BinaryTableStats; +use crate::spec::DataFileMeta; +use crate::spec::ManifestEntry; +use chrono::{DateTime, Utc}; + +impl AvroRecordDecode for ManifestEntry { + fn decode(cursor: &mut AvroCursor, writer_schema: &WriterSchema) -> crate::Result { + let mut kind: Option = None; + let mut partition: Option> = None; + let mut bucket: Option = None; + let mut total_buckets: Option = None; + let mut file: Option = None; + let mut version: Option = None; + + for field in &writer_schema.fields { + match field.name.as_str() { + "_KIND" => { + let v = read_int_field(cursor, field.nullable)?; + kind = Some(match v { + 0 => FileKind::Add, + 1 => FileKind::Delete, + _ => { + return Err(crate::Error::UnexpectedError { + message: format!("unknown FileKind: {v}"), + source: None, + }) + } + }); + } + "_PARTITION" => partition = Some(read_bytes_field(cursor, field.nullable)?), + "_BUCKET" => bucket = Some(read_int_field(cursor, field.nullable)?), + "_TOTAL_BUCKETS" => total_buckets = Some(read_int_field(cursor, field.nullable)?), + "_FILE" => { + file = decode_nullable_data_file_meta(cursor, &field.schema, field.nullable)?; + } + "_VERSION" => version = Some(read_int_field(cursor, field.nullable)?), + _ => skip_nullable_field(cursor, &field.schema, field.nullable)?, + } + } + + Ok(ManifestEntry::new( + kind.unwrap_or(FileKind::Add), + normalize_partition(partition), + bucket.unwrap_or(0), + total_buckets.unwrap_or(0), + file.unwrap_or_else(default_data_file_meta), + version.unwrap_or(0), + )) + } +} + +/// Decode ManifestEntry records with a filter applied on lightweight fields. +/// +/// Decodes only _KIND, _PARTITION, _BUCKET, _TOTAL_BUCKETS, _VERSION first. +/// If `filter` returns false, skips the expensive _FILE (DataFileMeta) decoding. +/// Returns only entries that pass the filter. +pub(crate) fn decode_manifest_entries_filtered( + cursor: &mut AvroCursor, + writer_schema: &WriterSchema, + is_union_wrapped: bool, + filter: &mut F, +) -> crate::Result> +where + F: FnMut(FileKind, &[u8], i32, i32) -> bool, +{ + if is_union_wrapped { + let idx = cursor.read_union_index()?; + if idx == 0 { + return Err(crate::Error::UnexpectedError { + message: "avro decode: unexpected null in top-level union".into(), + source: None, + }); + } + } + + // Two-pass decode: first collect lightweight fields and record _FILE position, + // then conditionally decode _FILE. + let mut kind: Option = None; + let mut partition: Option> = None; + let mut bucket: Option = None; + let mut total_buckets: Option = None; + let mut version: Option = None; + let mut file: Option = None; + let mut file_skipped = false; + + for field in &writer_schema.fields { + match field.name.as_str() { + "_KIND" => { + let v = read_int_field(cursor, field.nullable)?; + kind = Some(match v { + 0 => FileKind::Add, + 1 => FileKind::Delete, + _ => { + return Err(crate::Error::UnexpectedError { + message: format!("unknown FileKind: {v}"), + source: None, + }) + } + }); + } + "_PARTITION" => partition = Some(read_bytes_field(cursor, field.nullable)?), + "_BUCKET" => bucket = Some(read_int_field(cursor, field.nullable)?), + "_TOTAL_BUCKETS" => total_buckets = Some(read_int_field(cursor, field.nullable)?), + "_FILE" => { + let can_filter = kind.is_some() + && partition.is_some() + && bucket.is_some() + && total_buckets.is_some(); + if can_filter { + let k = kind.unwrap_or(FileKind::Add); + let p = partition.as_deref().unwrap_or(&[]); + let b = bucket.unwrap_or(0); + let tb = total_buckets.unwrap_or(0); + if filter(k, p, b, tb) { + file = + decode_nullable_data_file_meta(cursor, &field.schema, field.nullable)?; + } else { + skip_nullable_field(cursor, &field.schema, field.nullable)?; + file_skipped = true; + } + } else { + file = decode_nullable_data_file_meta(cursor, &field.schema, field.nullable)?; + } + } + "_VERSION" => version = Some(read_int_field(cursor, field.nullable)?), + _ => skip_nullable_field(cursor, &field.schema, field.nullable)?, + } + } + + if file_skipped { + return Ok(None); + } + + Ok(Some(ManifestEntry::new( + kind.unwrap_or(FileKind::Add), + normalize_partition(partition), + bucket.unwrap_or(0), + total_buckets.unwrap_or(0), + file.unwrap_or_else(default_data_file_meta), + version.unwrap_or(0), + ))) +} + +fn decode_nullable_data_file_meta( + cursor: &mut AvroCursor, + field_schema: &FieldSchema, + nullable: bool, +) -> crate::Result> { + if nullable { + let idx = cursor.read_union_index()?; + if idx == 0 { + return Ok(None); + } + } + let record_schema = + extract_record_schema(field_schema).ok_or_else(|| crate::Error::UnexpectedError { + message: "avro decode: _FILE field is not a record".into(), + source: None, + })?; + decode_data_file_meta(cursor, record_schema).map(Some) +} + +/// Read string array, handling both `{"type":"array",...}` and `["null", {"type":"array",...}]`. +fn read_string_array_field(cursor: &mut AvroCursor, nullable: bool) -> crate::Result> { + if nullable { + let idx = cursor.read_union_index()?; + if idx == 0 { + return Ok(vec![]); + } + } + decode_string_array(cursor) +} + +fn decode_data_file_meta( + cursor: &mut AvroCursor, + writer_schema: &WriterSchema, +) -> crate::Result { + let mut file_name: Option = None; + let mut file_size: Option = None; + let mut row_count: Option = None; + let mut min_key: Option> = None; + let mut max_key: Option> = None; + let mut key_stats: Option = None; + let mut value_stats: Option = None; + let mut min_sequence_number: Option = None; + let mut max_sequence_number: Option = None; + let mut schema_id: Option = None; + let mut level: Option = None; + let mut extra_files: Option> = None; + let mut creation_time: Option> = None; + let mut delete_row_count: Option = None; + let mut embedded_index: Option> = None; + let mut file_source: Option = None; + let mut value_stats_cols: Option> = None; + let mut external_path: Option = None; + let mut first_row_id: Option = None; + let mut write_cols: Option> = None; + + for field in &writer_schema.fields { + match field.name.as_str() { + "_FILE_NAME" => file_name = Some(read_string_field(cursor, field.nullable)?), + "_FILE_SIZE" => file_size = Some(read_long_field(cursor, field.nullable)?), + "_ROW_COUNT" => row_count = Some(read_long_field(cursor, field.nullable)?), + "_MIN_KEY" => min_key = Some(read_bytes_field(cursor, field.nullable)?), + "_MAX_KEY" => max_key = Some(read_bytes_field(cursor, field.nullable)?), + "_KEY_STATS" => { + key_stats = + decode_nullable_binary_table_stats(cursor, &field.schema, field.nullable)? + } + "_VALUE_STATS" => { + value_stats = + decode_nullable_binary_table_stats(cursor, &field.schema, field.nullable)? + } + "_MIN_SEQUENCE_NUMBER" => { + min_sequence_number = Some(read_long_field(cursor, field.nullable)?) + } + "_MAX_SEQUENCE_NUMBER" => { + max_sequence_number = Some(read_long_field(cursor, field.nullable)?) + } + "_SCHEMA_ID" => schema_id = Some(read_long_field(cursor, field.nullable)?), + "_LEVEL" => level = Some(read_int_field(cursor, field.nullable)?), + "_EXTRA_FILES" => extra_files = Some(read_string_array_field(cursor, field.nullable)?), + "_CREATION_TIME" => { + creation_time = decode_nullable_timestamp_millis(cursor, field.nullable)? + } + "_DELETE_ROW_COUNT" => delete_row_count = decode_nullable_long(cursor, field.nullable)?, + "_EMBEDDED_FILE_INDEX" => { + embedded_index = decode_nullable_bytes(cursor, field.nullable)? + } + "_FILE_SOURCE" => file_source = decode_nullable_int(cursor, field.nullable)?, + "_VALUE_STATS_COLS" => { + value_stats_cols = decode_nullable_string_array(cursor, field.nullable)? + } + "_EXTERNAL_PATH" => external_path = decode_nullable_string(cursor, field.nullable)?, + "_FIRST_ROW_ID" => first_row_id = decode_nullable_long(cursor, field.nullable)?, + "_WRITE_COLS" => write_cols = decode_nullable_string_array(cursor, field.nullable)?, + _ => skip_nullable_field(cursor, &field.schema, field.nullable)?, + } + } + + Ok(DataFileMeta { + file_name: file_name.unwrap_or_default(), + file_size: file_size.unwrap_or(0), + row_count: row_count.unwrap_or(0), + min_key: min_key.unwrap_or_default(), + max_key: max_key.unwrap_or_default(), + key_stats: key_stats.unwrap_or_else(|| BinaryTableStats::new(vec![], vec![], vec![])), + value_stats: value_stats.unwrap_or_else(|| BinaryTableStats::new(vec![], vec![], vec![])), + min_sequence_number: min_sequence_number.unwrap_or(0), + max_sequence_number: max_sequence_number.unwrap_or(0), + schema_id: schema_id.unwrap_or(0), + level: level.unwrap_or(0), + extra_files: extra_files.unwrap_or_default(), + creation_time, + delete_row_count, + embedded_index, + file_source, + value_stats_cols, + external_path, + first_row_id, + write_cols, + }) +} + +fn decode_string_array(cursor: &mut AvroCursor) -> crate::Result> { + let mut result = Vec::new(); + loop { + let count = cursor.read_long()?; + if count == 0 { + break; + } + let count = if count < 0 { + cursor.skip_long()?; + neg_count_to_usize(count)? + } else { + count as usize + }; + result.reserve(count); + for _ in 0..count { + result.push(cursor.read_string()?.to_string()); + } + } + Ok(result) +} + +fn decode_nullable_long(cursor: &mut AvroCursor, nullable: bool) -> crate::Result> { + if nullable { + let idx = cursor.read_union_index()?; + if idx == 0 { + return Ok(None); + } + } + Ok(Some(cursor.read_long()?)) +} + +fn decode_nullable_int(cursor: &mut AvroCursor, nullable: bool) -> crate::Result> { + if nullable { + let idx = cursor.read_union_index()?; + if idx == 0 { + return Ok(None); + } + } + Ok(Some(cursor.read_int()?)) +} + +fn decode_nullable_bytes( + cursor: &mut AvroCursor, + nullable: bool, +) -> crate::Result>> { + if nullable { + let idx = cursor.read_union_index()?; + if idx == 0 { + return Ok(None); + } + } + Ok(Some(cursor.read_bytes()?.to_vec())) +} + +fn decode_nullable_string( + cursor: &mut AvroCursor, + nullable: bool, +) -> crate::Result> { + if nullable { + let idx = cursor.read_union_index()?; + if idx == 0 { + return Ok(None); + } + } + Ok(Some(cursor.read_string()?.to_string())) +} + +fn decode_nullable_string_array( + cursor: &mut AvroCursor, + nullable: bool, +) -> crate::Result>> { + if nullable { + let idx = cursor.read_union_index()?; + if idx == 0 { + return Ok(None); + } + } + Ok(Some(decode_string_array(cursor)?)) +} + +fn decode_nullable_timestamp_millis( + cursor: &mut AvroCursor, + nullable: bool, +) -> crate::Result>> { + if nullable { + let idx = cursor.read_union_index()?; + if idx == 0 { + return Ok(None); + } + } + let millis = cursor.read_long()?; + let secs = millis.div_euclid(1000); + let nanos = (millis.rem_euclid(1000) * 1_000_000) as u32; + Ok(DateTime::from_timestamp(secs, nanos)) +} + +fn default_data_file_meta() -> DataFileMeta { + DataFileMeta { + file_name: String::new(), + file_size: 0, + row_count: 0, + min_key: vec![], + max_key: vec![], + key_stats: BinaryTableStats::new(vec![], vec![], vec![]), + value_stats: BinaryTableStats::new(vec![], vec![], vec![]), + min_sequence_number: 0, + max_sequence_number: 0, + schema_id: 0, + level: 0, + extra_files: vec![], + creation_time: None, + delete_row_count: None, + embedded_index: None, + file_source: None, + value_stats_cols: None, + external_path: None, + first_row_id: None, + write_cols: None, + } +} diff --git a/crates/paimon/src/spec/avro/manifest_file_meta_decode.rs b/crates/paimon/src/spec/avro/manifest_file_meta_decode.rs new file mode 100644 index 00000000..f0ce6ae6 --- /dev/null +++ b/crates/paimon/src/spec/avro/manifest_file_meta_decode.rs @@ -0,0 +1,139 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use super::cursor::AvroCursor; +use super::decode::{neg_count_to_usize, AvroRecordDecode}; +use super::decode_helpers::{ + extract_record_schema, read_bytes_field, read_int_field, read_long_field, read_string_field, +}; +use super::schema::{skip_nullable_field, FieldSchema, WriterSchema}; +use crate::spec::stats::BinaryTableStats; +use crate::spec::ManifestFileMeta; + +impl AvroRecordDecode for ManifestFileMeta { + fn decode(cursor: &mut AvroCursor, writer_schema: &WriterSchema) -> crate::Result { + let mut version: Option = None; + let mut file_name: Option = None; + let mut file_size: Option = None; + let mut num_added_files: Option = None; + let mut num_deleted_files: Option = None; + let mut partition_stats: Option = None; + let mut schema_id: Option = None; + + for field in &writer_schema.fields { + match field.name.as_str() { + "_VERSION" => version = Some(read_int_field(cursor, field.nullable)?), + "_FILE_NAME" => file_name = Some(read_string_field(cursor, field.nullable)?), + "_FILE_SIZE" => file_size = Some(read_long_field(cursor, field.nullable)?), + "_NUM_ADDED_FILES" => { + num_added_files = Some(read_long_field(cursor, field.nullable)?) + } + "_NUM_DELETED_FILES" => { + num_deleted_files = Some(read_long_field(cursor, field.nullable)?) + } + "_PARTITION_STATS" => { + partition_stats = + decode_nullable_binary_table_stats(cursor, &field.schema, field.nullable)?; + } + "_SCHEMA_ID" => schema_id = Some(read_long_field(cursor, field.nullable)?), + _ => skip_nullable_field(cursor, &field.schema, field.nullable)?, + } + } + + Ok(ManifestFileMeta::new_with_version( + version.unwrap_or(2), + file_name.unwrap_or_default(), + file_size.unwrap_or(0), + num_added_files.unwrap_or(0), + num_deleted_files.unwrap_or(0), + partition_stats.unwrap_or_else(|| BinaryTableStats::new(vec![], vec![], vec![])), + schema_id.unwrap_or(0), + )) + } +} + +/// Decode a nullable BinaryTableStats: union ["null", record] or direct record. +pub(crate) fn decode_nullable_binary_table_stats( + cursor: &mut AvroCursor, + schema: &FieldSchema, + nullable: bool, +) -> crate::Result> { + if nullable { + let idx = cursor.read_union_index()?; + if idx == 0 { + return Ok(None); + } + } + let record_schema = + extract_record_schema(schema).ok_or_else(|| crate::Error::UnexpectedError { + message: "avro decode: BinaryTableStats field is not a record".into(), + source: None, + })?; + let mut min_values: Option> = None; + let mut max_values: Option> = None; + let mut null_counts: Option>> = None; + for field in &record_schema.fields { + match field.name.as_str() { + "_MIN_VALUES" => min_values = Some(read_bytes_field(cursor, field.nullable)?), + "_MAX_VALUES" => max_values = Some(read_bytes_field(cursor, field.nullable)?), + "_NULL_COUNTS" => { + null_counts = Some(decode_nullable_long_array(cursor, field.nullable)?) + } + _ => super::schema::skip_nullable_field(cursor, &field.schema, field.nullable)?, + } + } + Ok(Some(BinaryTableStats::new( + min_values.unwrap_or_default(), + max_values.unwrap_or_default(), + null_counts.unwrap_or_default(), + ))) +} + +fn decode_nullable_long_array( + cursor: &mut AvroCursor, + nullable: bool, +) -> crate::Result>> { + if nullable { + let idx = cursor.read_union_index()?; + if idx == 0 { + return Ok(vec![]); + } + } + let mut result = Vec::new(); + loop { + let count = cursor.read_long()?; + if count == 0 { + break; + } + let count = if count < 0 { + cursor.skip_long()?; // block byte size + neg_count_to_usize(count)? + } else { + count as usize + }; + for _ in 0..count { + // Each item is union ["null", "long"] + let item_idx = cursor.read_union_index()?; + if item_idx == 0 { + result.push(None); + } else { + result.push(Some(cursor.read_long()?)); + } + } + } + Ok(result) +} diff --git a/crates/paimon/src/spec/avro/mod.rs b/crates/paimon/src/spec/avro/mod.rs new file mode 100644 index 00000000..f6cbb720 --- /dev/null +++ b/crates/paimon/src/spec/avro/mod.rs @@ -0,0 +1,204 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +pub mod cursor; +pub mod decode; +pub(crate) mod decode_helpers; +mod index_manifest_entry_decode; +pub(crate) mod manifest_entry_decode; +mod manifest_file_meta_decode; +pub mod ocf; +pub mod schema; + +use cursor::AvroCursor; +use decode::AvroRecordDecode; +use ocf::parse_ocf_streaming; +use schema::WriterSchema; +use std::sync::{Arc, RwLock}; + +/// Cache for parsed WriterSchemas, keyed by schema JSON string. +/// Same manifest type always produces the same schema JSON, so parsing +/// once and reusing across files within a scan saves repeated work. +/// Uses Vec instead of HashMap since Paimon tables typically have 1-2 distinct schemas. +pub struct SchemaCache { + cache: Vec<(String, Arc)>, +} + +impl SchemaCache { + pub fn new() -> Self { + Self { cache: Vec::new() } + } + + pub fn get_or_parse(&mut self, schema_json: &str) -> crate::Result> { + if let Some(cached) = self.cache.iter().find(|(k, _)| k == schema_json) { + return Ok(Arc::clone(&cached.1)); + } + let ws = Arc::new(WriterSchema::parse(schema_json)?); + self.cache.push((schema_json.to_string(), Arc::clone(&ws))); + Ok(ws) + } +} + +impl Default for SchemaCache { + fn default() -> Self { + Self::new() + } +} + +/// Thread-safe schema cache for sharing across concurrent async tasks. +/// Wraps `SchemaCache` in `Arc>` so multiple tasks can reuse +/// the same parsed `WriterSchema` without re-parsing. +#[derive(Clone)] +pub struct SharedSchemaCache { + inner: Arc>, +} + +impl SharedSchemaCache { + pub fn new() -> Self { + Self { + inner: Arc::new(RwLock::new(SchemaCache::new())), + } + } + + pub fn get_or_parse(&self, schema_json: &str) -> crate::Result> { + // Fast path: read lock for cache hit + { + let cache = self.inner.read().unwrap_or_else(|e| e.into_inner()); + if let Some(cached) = cache.cache.iter().find(|(k, _)| k == schema_json) { + return Ok(Arc::clone(&cached.1)); + } + } + // Slow path: write lock for cache miss + self.inner + .write() + .unwrap_or_else(|e| e.into_inner()) + .get_or_parse(schema_json) + } +} + +impl Default for SharedSchemaCache { + fn default() -> Self { + Self::new() + } +} + +/// Read an Avro OCF file and decode records directly into `T`, bypassing +/// the intermediate `apache_avro::Value` representation. +pub fn from_avro_bytes_fast(bytes: &[u8]) -> crate::Result> { + let mut cache = SchemaCache::new(); + from_avro_bytes_with_cache(bytes, &mut cache) +} + +/// Same as `from_avro_bytes_fast` but reuses a `SchemaCache` across calls. +pub fn from_avro_bytes_with_cache( + bytes: &[u8], + cache: &mut SchemaCache, +) -> crate::Result> { + let (header, mut block_iter) = parse_ocf_streaming(bytes)?; + let writer_schema = cache.get_or_parse(&header.schema_json)?; + + let mut results = Vec::new(); + while let Some(block) = block_iter.next_block()? { + results.reserve(block.object_count); + let mut cursor = AvroCursor::new(&block.data); + for _ in 0..block.object_count { + let record = decode_top_level_record::(&mut cursor, &writer_schema)?; + results.push(record); + } + } + + Ok(results) +} + +/// Decode ManifestEntry records from Avro OCF bytes with a lightweight filter. +/// +/// The filter receives `(kind, partition_bytes, bucket, total_buckets)` and +/// returns true to keep the entry. Entries that fail the filter skip the +/// expensive `DataFileMeta` decoding entirely. +pub fn from_manifest_bytes_filtered( + bytes: &[u8], + cache: &mut SchemaCache, + filter: &mut F, +) -> crate::Result> +where + F: FnMut(crate::spec::FileKind, &[u8], i32, i32) -> bool, +{ + let (header, mut block_iter) = parse_ocf_streaming(bytes)?; + let writer_schema = cache.get_or_parse(&header.schema_json)?; + decode_manifest_streaming(&mut block_iter, &writer_schema, filter) +} + +/// Decode ManifestEntry records from Avro OCF bytes using a pre-resolved shared schema. +/// +/// Use this when the `WriterSchema` is shared across concurrent tasks via +/// `SharedSchemaCache`. Falls back to parsing if the OCF schema differs. +pub fn from_manifest_bytes_filtered_shared( + bytes: &[u8], + shared_cache: &SharedSchemaCache, + filter: &mut F, +) -> crate::Result> +where + F: FnMut(crate::spec::FileKind, &[u8], i32, i32) -> bool, +{ + let (header, mut block_iter) = parse_ocf_streaming(bytes)?; + let writer_schema = shared_cache.get_or_parse(&header.schema_json)?; + decode_manifest_streaming(&mut block_iter, &writer_schema, filter) +} + +fn decode_manifest_streaming( + block_iter: &mut ocf::OcfBlockIter<'_>, + writer_schema: &WriterSchema, + filter: &mut F, +) -> crate::Result> +where + F: FnMut(crate::spec::FileKind, &[u8], i32, i32) -> bool, +{ + let mut results = Vec::new(); + while let Some(block) = block_iter.next_block()? { + results.reserve(block.object_count); + let mut cursor = AvroCursor::new(&block.data); + for _ in 0..block.object_count { + if let Some(entry) = manifest_entry_decode::decode_manifest_entries_filtered( + &mut cursor, + writer_schema, + writer_schema.is_union_wrapped, + filter, + )? { + results.push(entry); + } + } + } + Ok(results) +} + +/// Decode a single record from the cursor, handling the top-level union wrapper +/// that Paimon uses (`["null", record]`). +fn decode_top_level_record( + cursor: &mut AvroCursor, + writer_schema: &WriterSchema, +) -> crate::Result { + if writer_schema.is_union_wrapped { + let idx = cursor.read_union_index()?; + if idx == 0 { + return Err(crate::Error::UnexpectedError { + message: "avro decode: unexpected null in top-level union".into(), + source: None, + }); + } + } + T::decode(cursor, writer_schema) +} diff --git a/crates/paimon/src/spec/avro/ocf.rs b/crates/paimon/src/spec/avro/ocf.rs new file mode 100644 index 00000000..aed7ee25 --- /dev/null +++ b/crates/paimon/src/spec/avro/ocf.rs @@ -0,0 +1,299 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use super::cursor::AvroCursor; +use super::decode::neg_count_to_usize; +use crate::Error; +use std::borrow::Cow; +use std::collections::HashMap; + +const AVRO_MAGIC: &[u8; 4] = b"Obj\x01"; +const SYNC_MARKER_LEN: usize = 16; + +/// A decoded Avro OCF header. +pub struct OcfHeader { + pub schema_json: String, + pub codec: OcfCodec, + pub sync_marker: [u8; SYNC_MARKER_LEN], +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum OcfCodec { + Null, + Snappy, + Zstandard, +} + +/// A single data block from the OCF. +pub struct OcfBlock<'a> { + pub object_count: usize, + pub data: Cow<'a, [u8]>, +} + +/// Streaming iterator over OCF blocks with lazy decompression and reusable decoder state. +pub struct OcfBlockIter<'a> { + cursor: AvroCursor<'a>, + codec: OcfCodec, + sync_marker: [u8; SYNC_MARKER_LEN], + snappy_decoder: snap::raw::Decoder, +} + +impl<'a> OcfBlockIter<'a> { + fn new(cursor: AvroCursor<'a>, codec: OcfCodec, sync_marker: [u8; SYNC_MARKER_LEN]) -> Self { + Self { + cursor, + codec, + sync_marker, + snappy_decoder: snap::raw::Decoder::new(), + } + } + + pub fn next_block(&mut self) -> crate::Result>> { + if self.cursor.remaining() == 0 { + return Ok(None); + } + + let raw_object_count = self.cursor.read_long()?; + if raw_object_count < 0 { + return Err(Error::UnexpectedError { + message: format!("avro ocf: negative object count: {raw_object_count}"), + source: None, + }); + } + let object_count = raw_object_count as usize; + let raw_compressed_size = self.cursor.read_long()?; + if raw_compressed_size < 0 { + return Err(Error::UnexpectedError { + message: format!("avro ocf: negative compressed size: {raw_compressed_size}"), + source: None, + }); + } + let compressed_size = raw_compressed_size as usize; + let compressed_data = self.cursor.read_fixed(compressed_size)?; + + let data = self.decompress(compressed_data)?; + + let block_sync = self.cursor.read_fixed(SYNC_MARKER_LEN)?; + if block_sync != self.sync_marker { + return Err(Error::UnexpectedError { + message: "avro ocf: sync marker mismatch".into(), + source: None, + }); + } + + Ok(Some(OcfBlock { object_count, data })) + } + + fn decompress(&mut self, data: &'a [u8]) -> crate::Result> { + match self.codec { + OcfCodec::Null => Ok(Cow::Borrowed(data)), + OcfCodec::Snappy => { + if data.len() < 4 { + return Err(Error::UnexpectedError { + message: "avro ocf: snappy block too short for CRC".into(), + source: None, + }); + } + let compressed = &data[..data.len() - 4]; + let expected_crc = u32::from_be_bytes(data[data.len() - 4..].try_into().unwrap()); + let decompressed = self + .snappy_decoder + .decompress_vec(compressed) + .map_err(|e| Error::UnexpectedError { + message: format!("avro ocf: snappy decompression failed: {e}"), + source: None, + })?; + let actual_crc = crc32fast::hash(&decompressed); + if actual_crc != expected_crc { + return Err(Error::UnexpectedError { + message: format!( + "avro ocf: snappy CRC32C mismatch: expected {expected_crc:#010x}, got {actual_crc:#010x}" + ), + source: None, + }); + } + Ok(Cow::Owned(decompressed)) + } + OcfCodec::Zstandard => { + let decompressed = + zstd::stream::decode_all(data).map_err(|e| Error::UnexpectedError { + message: format!("avro ocf: zstd decompression failed: {e}"), + source: None, + })?; + Ok(Cow::Owned(decompressed)) + } + } + } +} + +/// Parse an Avro OCF header and return a streaming block iterator. +pub fn parse_ocf_streaming(bytes: &[u8]) -> crate::Result<(OcfHeader, OcfBlockIter<'_>)> { + let mut cursor = AvroCursor::new(bytes); + + let magic = cursor.read_fixed(4)?; + if magic != AVRO_MAGIC { + return Err(Error::UnexpectedError { + message: "avro ocf: invalid magic bytes".into(), + source: None, + }); + } + + let meta = read_avro_map(&mut cursor)?; + + let schema_json = meta + .get("avro.schema") + .ok_or_else(|| Error::UnexpectedError { + message: "avro ocf: missing avro.schema in header".into(), + source: None, + })? + .clone(); + + let codec = match meta.get("avro.codec").map(|s| s.as_str()) { + None | Some("null") => OcfCodec::Null, + Some("snappy") => OcfCodec::Snappy, + Some("zstandard") => OcfCodec::Zstandard, + Some(other) => { + return Err(Error::UnexpectedError { + message: format!("avro ocf: unsupported codec: {other}"), + source: None, + }); + } + }; + + let sync_marker: [u8; SYNC_MARKER_LEN] = + cursor.read_fixed(SYNC_MARKER_LEN)?.try_into().unwrap(); + + let header = OcfHeader { + schema_json, + codec, + sync_marker, + }; + + let iter = OcfBlockIter::new(cursor, header.codec, header.sync_marker); + Ok((header, iter)) +} + +/// Parse an Avro OCF file into header + blocks (eagerly decompresses all blocks). +#[cfg(test)] +pub fn parse_ocf(bytes: &[u8]) -> crate::Result<(OcfHeader, Vec>)> { + let (header, mut iter) = parse_ocf_streaming(bytes)?; + let mut blocks = Vec::new(); + while let Some(block) = iter.next_block()? { + blocks.push(block); + } + Ok((header, blocks)) +} + +/// Read an Avro-encoded map (used for OCF file metadata). +/// Map encoding: series of blocks, each block: count(long), entries...; terminated by 0-count. +fn read_avro_map(cursor: &mut AvroCursor) -> crate::Result> { + let mut map = HashMap::new(); + loop { + let count = cursor.read_long()?; + if count == 0 { + break; + } + let count = if count < 0 { + // Negative count means the block size in bytes follows (we skip it). + cursor.skip_long()?; + neg_count_to_usize(count)? + } else { + count as usize + }; + for _ in 0..count { + let key = cursor.read_string()?.to_string(); + let value_bytes = cursor.read_bytes()?; + let value = String::from_utf8_lossy(value_bytes).into_owned(); + map.insert(key, value); + } + } + Ok(map) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_parse_ocf_roundtrip() { + // Write a simple OCF with apache-avro, then parse with our reader + use apache_avro::{Codec, Schema, Writer}; + + let schema = Schema::parse_str(r#"{"type": "record", "name": "test", "fields": [{"name": "a", "type": "int"}, {"name": "b", "type": "string"}]}"#).unwrap(); + let mut writer = Writer::with_codec(&schema, Vec::new(), Codec::Null); + let mut record = apache_avro::types::Record::new(&schema).unwrap(); + record.put("a", 42i32); + record.put("b", "hello"); + writer.append(record).unwrap(); + let mut record2 = apache_avro::types::Record::new(&schema).unwrap(); + record2.put("a", 99i32); + record2.put("b", "world"); + writer.append(record2).unwrap(); + let bytes = writer.into_inner().unwrap(); + + let (header, blocks) = parse_ocf(&bytes).unwrap(); + assert_eq!(header.codec, OcfCodec::Null); + assert!(header.schema_json.contains("test")); + + let total_objects: usize = blocks.iter().map(|b| b.object_count).sum(); + assert_eq!(total_objects, 2); + } + + #[test] + fn test_parse_ocf_zstd() { + use apache_avro::{Codec, Schema, Writer}; + + let schema = Schema::parse_str( + r#"{"type": "record", "name": "test", "fields": [{"name": "x", "type": "long"}]}"#, + ) + .unwrap(); + let mut writer = Writer::with_codec( + &schema, + Vec::new(), + Codec::Zstandard(apache_avro::ZstandardSettings::default()), + ); + let mut record = apache_avro::types::Record::new(&schema).unwrap(); + record.put("x", 67890i64); + writer.append(record).unwrap(); + let bytes = writer.into_inner().unwrap(); + + let (header, blocks) = parse_ocf(&bytes).unwrap(); + assert_eq!(header.codec, OcfCodec::Zstandard); + assert_eq!(blocks.len(), 1); + assert_eq!(blocks[0].object_count, 1); + } + + #[test] + fn test_parse_ocf_snappy() { + use apache_avro::{Codec, Schema, Writer}; + + let schema = Schema::parse_str( + r#"{"type": "record", "name": "test", "fields": [{"name": "x", "type": "long"}]}"#, + ) + .unwrap(); + let mut writer = Writer::with_codec(&schema, Vec::new(), Codec::Snappy); + let mut record = apache_avro::types::Record::new(&schema).unwrap(); + record.put("x", 12345i64); + writer.append(record).unwrap(); + let bytes = writer.into_inner().unwrap(); + + let (header, blocks) = parse_ocf(&bytes).unwrap(); + assert_eq!(header.codec, OcfCodec::Snappy); + assert_eq!(blocks.len(), 1); + assert_eq!(blocks[0].object_count, 1); + } +} diff --git a/crates/paimon/src/spec/avro/schema.rs b/crates/paimon/src/spec/avro/schema.rs new file mode 100644 index 00000000..65810ddf --- /dev/null +++ b/crates/paimon/src/spec/avro/schema.rs @@ -0,0 +1,401 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::Error; +use serde_json::Value; + +/// Lightweight representation of an Avro writer schema, used to build +/// field-index mappings for schema-evolution-aware decoding. +#[derive(Debug)] +pub struct WriterSchema { + /// The top-level record fields (after unwrapping a union wrapper if present). + pub fields: Vec, + /// Whether the original schema was a union (e.g. `["null", record]`). + pub is_union_wrapped: bool, +} + +#[derive(Debug)] +pub struct WriterField { + pub name: String, + pub schema: FieldSchema, + pub nullable: bool, +} + +/// Simplified Avro field schema — just enough to know how to skip unknown fields. +#[derive(Debug)] +pub enum FieldSchema { + Null, + Boolean, + Int, + Long, + Float, + Double, + Bytes, + String, + Fixed(usize), + Enum { symbols_count: usize }, + Union(Vec), + Array(Box), + Map(Box), + Record(WriterSchema), +} + +impl WriterSchema { + pub fn parse(json: &str) -> crate::Result { + let value: Value = serde_json::from_str(json).map_err(|e| Error::UnexpectedError { + message: format!("avro schema: invalid JSON: {e}"), + source: None, + })?; + parse_record_from_value(&value) + } +} + +fn parse_record_from_value(value: &Value) -> crate::Result { + match value { + Value::Object(obj) if obj.get("type").and_then(|t| t.as_str()) == Some("record") => { + let mut ws = parse_record_object(obj)?; + ws.is_union_wrapped = false; + Ok(ws) + } + Value::Array(arr) => { + for item in arr { + if let Value::Object(obj) = item { + if obj.get("type").and_then(|t| t.as_str()) == Some("record") { + let mut ws = parse_record_object(obj)?; + ws.is_union_wrapped = true; + return Ok(ws); + } + } + } + Err(Error::UnexpectedError { + message: "avro schema: union does not contain a record type".into(), + source: None, + }) + } + _ => Err(Error::UnexpectedError { + message: "avro schema: expected record or union containing record".into(), + source: None, + }), + } +} + +fn parse_record_object(obj: &serde_json::Map) -> crate::Result { + let fields_arr = obj + .get("fields") + .and_then(|f| f.as_array()) + .ok_or_else(|| Error::UnexpectedError { + message: "avro schema: record missing 'fields' array".into(), + source: None, + })?; + + let mut fields = Vec::with_capacity(fields_arr.len()); + + for field_val in fields_arr.iter() { + let field_obj = field_val + .as_object() + .ok_or_else(|| Error::UnexpectedError { + message: "avro schema: field is not an object".into(), + source: None, + })?; + let name = field_obj + .get("name") + .and_then(|n| n.as_str()) + .ok_or_else(|| Error::UnexpectedError { + message: "avro schema: field missing 'name'".into(), + source: None, + })? + .to_string(); + let type_val = field_obj + .get("type") + .ok_or_else(|| Error::UnexpectedError { + message: format!("avro schema: field '{name}' missing 'type'"), + source: None, + })?; + let schema = parse_field_schema(type_val)?; + let (schema, nullable) = unwrap_nullable(schema); + fields.push(WriterField { + name, + schema, + nullable, + }); + } + + Ok(WriterSchema { + fields, + is_union_wrapped: false, + }) +} + +/// For a nullable union `["null", T]`, extract the inner type `T` and return `(T, true)`. +/// For non-nullable schemas, return `(schema, false)`. +fn unwrap_nullable(schema: FieldSchema) -> (FieldSchema, bool) { + if let FieldSchema::Union(branches) = &schema { + let has_null = branches.iter().any(|b| matches!(b, FieldSchema::Null)); + if has_null && branches.len() == 2 { + let FieldSchema::Union(mut branches) = schema else { + unreachable!() + }; + let inner = if matches!(branches[0], FieldSchema::Null) { + branches.swap_remove(1) + } else { + branches.swap_remove(0) + }; + return (inner, true); + } + } + (schema, false) +} + +fn parse_field_schema(value: &Value) -> crate::Result { + match value { + Value::String(s) => match s.as_str() { + "null" => Ok(FieldSchema::Null), + "boolean" => Ok(FieldSchema::Boolean), + "int" => Ok(FieldSchema::Int), + "long" => Ok(FieldSchema::Long), + "float" => Ok(FieldSchema::Float), + "double" => Ok(FieldSchema::Double), + "bytes" => Ok(FieldSchema::Bytes), + "string" => Ok(FieldSchema::String), + _ => Err(Error::UnexpectedError { + message: format!("avro schema: unsupported named type reference: {s}"), + source: None, + }), + }, + Value::Object(obj) => { + let type_str = obj.get("type").and_then(|t| t.as_str()).unwrap_or(""); + match type_str { + "record" => { + let record = parse_record_object(obj)?; + Ok(FieldSchema::Record(record)) + } + "array" => { + let items = obj.get("items").ok_or_else(|| Error::UnexpectedError { + message: "avro schema: array missing 'items'".into(), + source: None, + })?; + let item_schema = parse_field_schema(items)?; + Ok(FieldSchema::Array(Box::new(item_schema))) + } + "map" => { + let values = obj.get("values").ok_or_else(|| Error::UnexpectedError { + message: "avro schema: map missing 'values'".into(), + source: None, + })?; + let value_schema = parse_field_schema(values)?; + Ok(FieldSchema::Map(Box::new(value_schema))) + } + "fixed" => { + let size = obj.get("size").and_then(|s| s.as_u64()).ok_or_else(|| { + Error::UnexpectedError { + message: "avro schema: fixed missing 'size'".into(), + source: None, + } + })? as usize; + Ok(FieldSchema::Fixed(size)) + } + "long" | "int" => { + if type_str == "long" { + Ok(FieldSchema::Long) + } else { + Ok(FieldSchema::Int) + } + } + "enum" => { + let symbols = + obj.get("symbols") + .and_then(|s| s.as_array()) + .ok_or_else(|| Error::UnexpectedError { + message: "avro schema: enum missing 'symbols'".into(), + source: None, + })?; + Ok(FieldSchema::Enum { + symbols_count: symbols.len(), + }) + } + _ => Err(Error::UnexpectedError { + message: format!("avro schema: unsupported type: {type_str}"), + source: None, + }), + } + } + Value::Array(arr) => { + let branches: Vec = arr + .iter() + .map(parse_field_schema) + .collect::>()?; + Ok(FieldSchema::Union(branches)) + } + _ => Err(Error::UnexpectedError { + message: format!("avro schema: unexpected value: {value}"), + source: None, + }), + } +} + +/// Skip a value in the cursor according to its schema. +pub fn skip_field( + cursor: &mut super::cursor::AvroCursor, + schema: &FieldSchema, +) -> crate::Result<()> { + skip_field_inner(cursor, schema, false) +} + +/// Skip a field that may have been unwrapped from a nullable union. +/// When `nullable` is true, reads the union index first. +pub fn skip_nullable_field( + cursor: &mut super::cursor::AvroCursor, + schema: &FieldSchema, + nullable: bool, +) -> crate::Result<()> { + skip_field_inner(cursor, schema, nullable) +} + +fn skip_field_inner( + cursor: &mut super::cursor::AvroCursor, + schema: &FieldSchema, + nullable: bool, +) -> crate::Result<()> { + if nullable { + let idx = cursor.read_union_index()?; + if idx == 0 { + return Ok(()); + } + } + match schema { + FieldSchema::Null => Ok(()), + FieldSchema::Boolean => cursor.skip_raw(1), + FieldSchema::Int | FieldSchema::Long | FieldSchema::Enum { .. } => cursor.skip_long(), + FieldSchema::Float => cursor.skip_raw(4), + FieldSchema::Double => cursor.skip_raw(8), + FieldSchema::Bytes | FieldSchema::String => cursor.skip_bytes(), + FieldSchema::Fixed(size) => cursor.skip_raw(*size), + FieldSchema::Union(branches) => { + let idx = cursor.read_union_index()? as usize; + if idx < branches.len() { + skip_field(cursor, &branches[idx]) + } else { + Err(Error::UnexpectedError { + message: format!("avro skip: union index {idx} out of range"), + source: None, + }) + } + } + FieldSchema::Array(item_schema) => { + loop { + let count = cursor.read_long()?; + if count == 0 { + break; + } + let count = if count < 0 { + cursor.skip_long()?; + (-count) as usize + } else { + count as usize + }; + for _ in 0..count { + skip_field(cursor, item_schema)?; + } + } + Ok(()) + } + FieldSchema::Map(value_schema) => { + loop { + let count = cursor.read_long()?; + if count == 0 { + break; + } + let count = if count < 0 { + cursor.skip_long()?; + (-count) as usize + } else { + count as usize + }; + for _ in 0..count { + cursor.skip_bytes()?; + skip_field(cursor, value_schema)?; + } + } + Ok(()) + } + FieldSchema::Record(record_schema) => { + for field in &record_schema.fields { + skip_field_inner(cursor, &field.schema, field.nullable)?; + } + Ok(()) + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_parse_manifest_file_meta_schema() { + let schema_json = r#"["null", { + "type": "record", + "name": "record", + "namespace": "org.apache.paimon.avro.generated", + "fields": [ + {"name": "_VERSION", "type": "int"}, + {"name": "_FILE_NAME", "type": "string"}, + {"name": "_FILE_SIZE", "type": "long"}, + {"name": "_NUM_ADDED_FILES", "type": "long"}, + {"name": "_NUM_DELETED_FILES", "type": "long"}, + {"name": "_PARTITION_STATS", "type": ["null", { + "type": "record", + "name": "record__PARTITION_STATS", + "fields": [ + {"name": "_MIN_VALUES", "type": "bytes"}, + {"name": "_MAX_VALUES", "type": "bytes"}, + {"name": "_NULL_COUNTS", "type": ["null", {"type": "array", "items": ["null", "long"]}], "default": null} + ] + }], "default": null}, + {"name": "_SCHEMA_ID", "type": "long"} + ] + }]"#; + + let ws = WriterSchema::parse(schema_json).unwrap(); + assert_eq!(ws.fields.len(), 7); + assert!(ws.is_union_wrapped); + assert_eq!(ws.fields[0].name, "_VERSION"); + assert_eq!(ws.fields[1].name, "_FILE_NAME"); + assert_eq!(ws.fields[6].name, "_SCHEMA_ID"); + } + + #[test] + fn test_parse_nested_record() { + let schema_json = r#"{"type": "record", "name": "test", "fields": [ + {"name": "nested", "type": ["null", {"type": "record", "name": "inner", "fields": [ + {"name": "x", "type": "int"} + ]}]} + ]}"#; + + let ws = WriterSchema::parse(schema_json).unwrap(); + assert!(!ws.is_union_wrapped); + assert_eq!(ws.fields.len(), 1); + assert!(ws.fields[0].nullable); + match &ws.fields[0].schema { + FieldSchema::Record(inner) => { + assert_eq!(inner.fields.len(), 1); + assert_eq!(inner.fields[0].name, "x"); + } + _ => panic!("expected record"), + } + } +} diff --git a/crates/paimon/src/spec/index_manifest.rs b/crates/paimon/src/spec/index_manifest.rs index 8ca5b865..25994437 100644 --- a/crates/paimon/src/spec/index_manifest.rs +++ b/crates/paimon/src/spec/index_manifest.rs @@ -118,16 +118,13 @@ impl IndexManifest { /// Read index manifest entries from a file. pub async fn read(file_io: &FileIO, path: &str) -> Result> { let input_file = file_io.new_input(path)?; - if !input_file.exists().await? { - return Ok(Vec::new()); - } let content = input_file.read().await?; Self::read_from_bytes(&content) } /// Read index manifest entries from Avro-encoded bytes. pub fn read_from_bytes(bytes: &[u8]) -> Result> { - crate::spec::from_avro_bytes(bytes) + crate::spec::avro::from_avro_bytes_fast(bytes) } /// Write index manifest entries to a file. diff --git a/crates/paimon/src/spec/manifest.rs b/crates/paimon/src/spec/manifest.rs index 193359e5..8131e7a0 100644 --- a/crates/paimon/src/spec/manifest.rs +++ b/crates/paimon/src/spec/manifest.rs @@ -16,8 +16,10 @@ // under the License. use crate::io::FileIO; +use crate::spec::avro::SchemaCache; use crate::spec::manifest_entry::ManifestEntry; use crate::spec::manifest_entry::MANIFEST_ENTRY_SCHEMA; +use crate::spec::FileKind; use crate::Result; @@ -33,18 +35,29 @@ impl Manifest { /// Read manifest entries from a file. pub async fn read(file_io: &FileIO, path: &str) -> Result> { let input_file = file_io.new_input(path)?; - - if !input_file.exists().await? { - return Ok(Vec::new()); - } - let content = input_file.read().await?; Self::read_from_bytes(&content) } /// Read manifest entries from bytes. fn read_from_bytes(bytes: &[u8]) -> Result> { - crate::spec::from_avro_bytes(bytes) + crate::spec::avro::from_avro_bytes_fast(bytes) + } + + /// Read manifest entries with a lightweight filter on (kind, partition, bucket, total_buckets). + /// Entries that fail the filter skip DataFileMeta decoding entirely. + pub async fn read_filtered( + file_io: &FileIO, + path: &str, + cache: &mut SchemaCache, + filter: &mut F, + ) -> Result> + where + F: FnMut(FileKind, &[u8], i32, i32) -> bool, + { + let input_file = file_io.new_input(path)?; + let content = input_file.read().await?; + crate::spec::avro::from_manifest_bytes_filtered(&content, cache, filter) } /// Write manifest entries to a file. diff --git a/crates/paimon/src/spec/manifest_common.rs b/crates/paimon/src/spec/manifest_common.rs index bfac29bf..1a332756 100644 --- a/crates/paimon/src/spec/manifest_common.rs +++ b/crates/paimon/src/spec/manifest_common.rs @@ -19,7 +19,7 @@ use serde_repr::{Deserialize_repr, Serialize_repr}; /// Kind of a file. /// Impl Reference: -#[derive(PartialEq, Eq, Debug, Clone, Serialize_repr, Deserialize_repr)] +#[derive(PartialEq, Eq, Debug, Clone, Copy, Serialize_repr, Deserialize_repr)] #[repr(u8)] pub enum FileKind { Add = 0, diff --git a/crates/paimon/src/spec/manifest_entry.rs b/crates/paimon/src/spec/manifest_entry.rs index cbe295b2..1d02fb7f 100644 --- a/crates/paimon/src/spec/manifest_entry.rs +++ b/crates/paimon/src/spec/manifest_entry.rs @@ -18,11 +18,12 @@ use crate::spec::manifest_common::FileKind; use crate::spec::DataFileMeta; use serde::{Deserialize, Serialize}; +use std::hash::{Hash, Hasher}; /// The same {@link Identifier} indicates that the {@link ManifestEntry} refers to the same data file. /// /// Impl Reference: -#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct Identifier { pub partition: Vec, pub bucket: i32, @@ -33,6 +34,14 @@ pub struct Identifier { pub external_path: Option, } +impl Hash for Identifier { + fn hash(&self, state: &mut H) { + self.partition.hash(state); + self.bucket.hash(state); + self.file_name.hash(state); + } +} + /// Entry of a manifest file, representing an addition / deletion of a data file. /// Impl Reference: #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] @@ -99,6 +108,23 @@ impl ManifestEntry { } } + pub(crate) fn into_identifier(self) -> Identifier { + Identifier { + partition: self.partition, + bucket: self.bucket, + level: self.file.level, + file_name: self.file.file_name, + extra_files: self.file.extra_files, + embedded_index: self.file.embedded_index, + external_path: self.file.external_path, + } + } + + /// Consume the entry and return (partition, bucket, total_buckets, file). + pub(crate) fn into_parts(self) -> (Vec, i32, i32, DataFileMeta) { + (self.partition, self.bucket, self.total_buckets, self.file) + } + pub fn total_buckets(&self) -> i32 { self.total_buckets } diff --git a/crates/paimon/src/spec/manifest_file_meta.rs b/crates/paimon/src/spec/manifest_file_meta.rs index b74389eb..ee630f8f 100644 --- a/crates/paimon/src/spec/manifest_file_meta.rs +++ b/crates/paimon/src/spec/manifest_file_meta.rs @@ -113,6 +113,27 @@ impl ManifestFileMeta { schema_id, } } + + #[inline] + pub(crate) fn new_with_version( + version: i32, + file_name: String, + file_size: i64, + num_added_files: i64, + num_deleted_files: i64, + partition_stats: BinaryTableStats, + schema_id: i64, + ) -> ManifestFileMeta { + Self { + version, + file_name, + file_size, + num_added_files, + num_deleted_files, + partition_stats, + schema_id, + } + } } /// Avro schema for ManifestFileMeta (used in manifest-list files). diff --git a/crates/paimon/src/spec/manifest_list.rs b/crates/paimon/src/spec/manifest_list.rs index 050edf46..16eb0499 100644 --- a/crates/paimon/src/spec/manifest_list.rs +++ b/crates/paimon/src/spec/manifest_list.rs @@ -32,11 +32,8 @@ impl ManifestList { /// Read manifest file metas from a manifest list file. pub async fn read(file_io: &FileIO, path: &str) -> Result> { let input = file_io.new_input(path)?; - if !input.exists().await? { - return Ok(Vec::new()); - } let content = input.read().await?; - crate::spec::from_avro_bytes(&content) + crate::spec::avro::from_avro_bytes_fast(&content) } /// Write manifest file metas to a manifest list file. @@ -98,10 +95,8 @@ mod tests { #[tokio::test] async fn test_manifest_list_read_nonexistent() { let file_io = test_file_io(); - let result = ManifestList::read(&file_io, "memory:/nonexistent/manifest-list") - .await - .unwrap(); - assert!(result.is_empty()); + let result = ManifestList::read(&file_io, "memory:/nonexistent/manifest-list").await; + assert!(result.is_err()); } #[tokio::test] diff --git a/crates/paimon/src/spec/mod.rs b/crates/paimon/src/spec/mod.rs index 5d2593b0..e9bc085f 100644 --- a/crates/paimon/src/spec/mod.rs +++ b/crates/paimon/src/spec/mod.rs @@ -61,6 +61,7 @@ pub use manifest_list::ManifestList; mod objects_file; pub use objects_file::from_avro_bytes; pub use objects_file::to_avro_bytes; +pub(crate) mod avro; pub(crate) mod stats; mod types; pub use types::*; diff --git a/crates/paimon/src/table/bucket_filter.rs b/crates/paimon/src/table/bucket_filter.rs index 80942d3e..7091578e 100644 --- a/crates/paimon/src/table/bucket_filter.rs +++ b/crates/paimon/src/table/bucket_filter.rs @@ -168,26 +168,20 @@ fn collect_eq_candidates<'a>( op, literals, .. - } => { - if *index < field_candidates.len() { - match op { - PredicateOperator::Eq => { - if let Some(lit) = literals.first() { - field_candidates[*index] = Some(vec![Some(lit)]); - } - } - PredicateOperator::In => { - if !literals.is_empty() { - field_candidates[*index] = Some(literals.iter().map(Some).collect()); - } - } - PredicateOperator::IsNull => { - field_candidates[*index] = Some(vec![None]); - } - _ => {} + } if *index < field_candidates.len() => match op { + PredicateOperator::Eq => { + if let Some(lit) = literals.first() { + field_candidates[*index] = Some(vec![Some(lit)]); } } - } + PredicateOperator::In if !literals.is_empty() => { + field_candidates[*index] = Some(literals.iter().map(Some).collect()); + } + PredicateOperator::IsNull => { + field_candidates[*index] = Some(vec![None]); + } + _ => {} + }, _ => {} } } diff --git a/crates/paimon/src/table/data_evolution_reader.rs b/crates/paimon/src/table/data_evolution_reader.rs index 107add2d..487ed951 100644 --- a/crates/paimon/src/table/data_evolution_reader.rs +++ b/crates/paimon/src/table/data_evolution_reader.rs @@ -948,7 +948,7 @@ fn normalize_merge_group(files: Vec) -> crate::Result crate::Result> { let num_fields = partition_fields.len(); - let mut field_values: Vec>> = vec![Vec::new(); num_fields]; + let partition_count = partitions.len(); + let mut field_values: Vec>> = (0..num_fields) + .map(|_| Vec::with_capacity(partition_count)) + .collect(); for bytes in partitions { let row = BinaryRow::from_serialized_bytes(bytes)?; @@ -283,26 +286,20 @@ fn collect_eq_candidates<'a>( op, literals, .. - } => { - if *index < field_candidates.len() { - match op { - PredicateOperator::Eq => { - if let Some(lit) = literals.first() { - field_candidates[*index] = Some(vec![Some(lit)]); - } - } - PredicateOperator::In => { - if !literals.is_empty() { - field_candidates[*index] = Some(literals.iter().map(Some).collect()); - } - } - PredicateOperator::IsNull => { - field_candidates[*index] = Some(vec![None]); - } - _ => {} + } if *index < field_candidates.len() => match op { + PredicateOperator::Eq => { + if let Some(lit) = literals.first() { + field_candidates[*index] = Some(vec![Some(lit)]); } } - } + PredicateOperator::In if !literals.is_empty() => { + field_candidates[*index] = Some(literals.iter().map(Some).collect()); + } + PredicateOperator::IsNull => { + field_candidates[*index] = Some(vec![None]); + } + _ => {} + }, _ => {} } } diff --git a/crates/paimon/src/table/stats_filter.rs b/crates/paimon/src/table/stats_filter.rs index 40ce43a9..68a4d656 100644 --- a/crates/paimon/src/table/stats_filter.rs +++ b/crates/paimon/src/table/stats_filter.rs @@ -61,18 +61,26 @@ impl FileStatsRows { /// When `value_stats_cols` is `Some`, stats are in dense mode — only covering those /// columns, and the mapping from schema field index to stats index is built by name. pub(super) fn from_data_file(file: &DataFileMeta, schema_fields: &[DataField]) -> Self { - // Determine which columns the stats cover and build the mapping. - // Priority: value_stats_cols > write_cols > all schema fields. let stats_col_mapping = if let Some(cols) = &file.value_stats_cols { + let col_index: HashMap<&str, usize> = cols + .iter() + .enumerate() + .map(|(i, c)| (c.as_str(), i)) + .collect(); let mapping: Vec> = schema_fields .iter() - .map(|field| cols.iter().position(|c| c == field.name())) + .map(|field| col_index.get(field.name()).copied()) .collect(); Some(mapping) } else if let Some(cols) = &file.write_cols { + let col_index: HashMap<&str, usize> = cols + .iter() + .enumerate() + .map(|(i, c)| (c.as_str(), i)) + .collect(); let mapping: Vec> = schema_fields .iter() - .map(|field| cols.iter().position(|c| c == field.name())) + .map(|field| col_index.get(field.name()).copied()) .collect(); Some(mapping) } else { @@ -292,27 +300,30 @@ pub(super) fn data_evolution_group_matches_predicates( // Sort files by max_sequence_number descending so the highest-seq file wins per field. let mut sorted_files: Vec<&DataFileMeta> = group.iter().collect(); - sorted_files.sort_by(|a, b| b.max_sequence_number.cmp(&a.max_sequence_number)); + sorted_files.sort_by_key(|f| std::cmp::Reverse(f.max_sequence_number)); // For each table field, find which file (index in sorted_files) provides it. // Use file_data_columns (based on write_cols) to determine which file contains // the field, not file_stats_columns (based on value_stats_cols) which only // indicates stats coverage. - let field_sources: Vec> = table_fields - .iter() - .enumerate() - .map(|(field_idx, field)| { - for (file_idx, file) in sorted_files.iter().enumerate() { - let file_columns = file_data_columns(file, table_fields); - for col_name in &file_columns { - if *col_name == field.name() { + let field_sources: Vec> = { + let per_file_columns: Vec> = sorted_files + .iter() + .map(|file| file_data_columns(file, table_fields)) + .collect(); + table_fields + .iter() + .enumerate() + .map(|(field_idx, field)| { + for (file_idx, cols) in per_file_columns.iter().enumerate() { + if cols.iter().any(|c| *c == field.name()) { return Some((file_idx, field_idx)); } } - } - None - }) - .collect(); + None + }) + .collect() + }; // Build per-file stats without arity validation — data evolution files // may have fewer columns than the current table schema. diff --git a/crates/paimon/src/table/table_commit.rs b/crates/paimon/src/table/table_commit.rs index af3e56e9..47cc0fcc 100644 --- a/crates/paimon/src/table/table_commit.rs +++ b/crates/paimon/src/table/table_commit.rs @@ -1409,7 +1409,11 @@ mod tests { let mut initial_file = test_data_file("data-0.parquet", 100); initial_file.file_source = Some(0); // APPEND commit - .commit(vec![CommitMessage::new(vec![], 0, vec![initial_file])]) + .commit(vec![CommitMessage::new( + vec![0, 0, 0, 0], + 0, + vec![initial_file], + )]) .await .unwrap(); @@ -1426,7 +1430,11 @@ mod tests { partial_file.write_cols = Some(vec!["name".to_string()]); let result = commit - .commit(vec![CommitMessage::new(vec![], 0, vec![partial_file])]) + .commit(vec![CommitMessage::new( + vec![0, 0, 0, 0], + 0, + vec![partial_file], + )]) .await; assert!(result.is_err()); @@ -1450,7 +1458,11 @@ mod tests { let mut initial_file = test_data_file("data-0.parquet", 100); initial_file.file_source = Some(0); commit - .commit(vec![CommitMessage::new(vec![], 0, vec![initial_file])]) + .commit(vec![CommitMessage::new( + vec![0, 0, 0, 0], + 0, + vec![initial_file], + )]) .await .unwrap(); @@ -1461,7 +1473,11 @@ mod tests { partial_file.write_cols = Some(vec!["name".to_string()]); commit - .commit(vec![CommitMessage::new(vec![], 0, vec![partial_file])]) + .commit(vec![CommitMessage::new( + vec![0, 0, 0, 0], + 0, + vec![partial_file], + )]) .await .unwrap(); @@ -1486,7 +1502,11 @@ mod tests { partial_file.write_cols = Some(vec!["name".to_string()]); let result = commit - .commit(vec![CommitMessage::new(vec![], 0, vec![partial_file])]) + .commit(vec![CommitMessage::new( + vec![0, 0, 0, 0], + 0, + vec![partial_file], + )]) .await; assert!(result.is_err()); @@ -1562,7 +1582,11 @@ mod tests { .await .unwrap(); - let mut msg = CommitMessage::new(vec![], 0, vec![test_data_file("data-new.parquet", 80)]); + let mut msg = CommitMessage::new( + vec![0, 0, 0, 0], + 0, + vec![test_data_file("data-new.parquet", 80)], + ); msg.deleted_files = vec![test_data_file("nonexistent.parquet", 100)]; let result = commit.commit(vec![msg]).await; @@ -1591,7 +1615,11 @@ mod tests { .await .unwrap(); - let mut msg = CommitMessage::new(vec![], 0, vec![test_data_file("data-new.parquet", 80)]); + let mut msg = CommitMessage::new( + vec![0, 0, 0, 0], + 0, + vec![test_data_file("data-new.parquet", 80)], + ); msg.deleted_files = vec![test_data_file("data-0.parquet", 100)]; commit.commit(vec![msg]).await.unwrap(); @@ -1612,7 +1640,7 @@ mod tests { let commit = setup_commit(&file_io, table_path); - let mut msg = CommitMessage::new(vec![], 0, vec![]); + let mut msg = CommitMessage::new(vec![0, 0, 0, 0], 0, vec![]); msg.deleted_files = vec![test_data_file("data-0.parquet", 100)]; let result = commit.commit(vec![msg]).await; diff --git a/crates/paimon/src/table/table_scan.rs b/crates/paimon/src/table/table_scan.rs index 5376c4f2..02117040 100644 --- a/crates/paimon/src/table/table_scan.rs +++ b/crates/paimon/src/table/table_scan.rs @@ -30,8 +30,9 @@ use super::stats_filter::{ use super::Table; use crate::io::FileIO; use crate::spec::{ - bucket_dir_name, BinaryRow, CoreOptions, DataField, DataFileMeta, FileKind, IndexManifest, - ManifestEntry, PartitionComputer, Predicate, Snapshot, TimeTravelSelector, + avro::SharedSchemaCache, bucket_dir_name, BinaryRow, CoreOptions, DataField, DataFileMeta, + FileKind, IndexManifest, ManifestEntry, PartitionComputer, Predicate, Snapshot, + TimeTravelSelector, }; use crate::table::bin_pack::split_for_batch; use crate::table::source::{ @@ -56,6 +57,9 @@ async fn read_manifest_list( table_path: &str, list_name: &str, ) -> crate::Result> { + if list_name.is_empty() { + return Ok(Vec::new()); + } let path = format!( "{}/{}/{}", table_path.trim_end_matches('/'), @@ -63,11 +67,8 @@ async fn read_manifest_list( list_name ); let input = file_io.new_input(&path)?; - if !input.exists().await? { - return Ok(Vec::new()); - } let bytes = input.read().await?; - crate::spec::from_avro_bytes::(&bytes) + crate::spec::avro::from_avro_bytes_fast::(&bytes) } /// Reads all manifest entries for a snapshot (base + delta manifest lists, then each manifest file). @@ -92,9 +93,10 @@ async fn read_all_manifest_entries( bucket_predicate: Option<&Predicate>, bucket_key_fields: &[DataField], ) -> crate::Result> { - let mut manifest_files = - read_manifest_list(file_io, table_path, snapshot.base_manifest_list()).await?; - let delta = read_manifest_list(file_io, table_path, snapshot.delta_manifest_list()).await?; + let (mut manifest_files, delta) = futures::try_join!( + read_manifest_list(file_io, table_path, snapshot.base_manifest_list()), + read_manifest_list(file_io, table_path, snapshot.delta_manifest_list()), + )?; manifest_files.extend(delta); // Manifest-file-level partition stats pruning: skip entire manifest files @@ -118,40 +120,57 @@ async fn read_all_manifest_entries( } let manifest_path_prefix = format!("{}/{}", table_path.trim_end_matches('/'), MANIFEST_DIR); + let shared_cache = SharedSchemaCache::new(); let all_entries: Vec = futures::stream::iter(manifest_files) .map(|meta| { let path = format!("{}/{}", manifest_path_prefix, meta.file_name()); + let cache = shared_cache.clone(); async move { - let entries = crate::spec::Manifest::read(file_io, &path).await?; + let input_file = file_io.new_input(&path)?; + let content = input_file.read().await?; + // Per-task bucket cache (few distinct total_buckets values per manifest). let mut bucket_cache: HashMap>> = HashMap::new(); - let filtered: Vec = entries - .into_iter() - .filter(|entry| { - if skip_level_zero && has_primary_keys && entry.file().level == 0 { - return false; - } - if has_primary_keys && !scan_all_files && entry.bucket() < 0 { + + let entries = crate::spec::avro::from_manifest_bytes_filtered_shared( + &content, + &cache, + &mut |_kind, partition_bytes, bucket, total_buckets| { + // Bucket filter (negative bucket = unassigned) + if has_primary_keys && !scan_all_files && bucket < 0 { return false; } if let Some(pred) = bucket_predicate { - let total = entry.total_buckets(); - let targets = bucket_cache.entry(total).or_insert_with(|| { - compute_target_buckets(pred, bucket_key_fields, total) + let targets = bucket_cache.entry(total_buckets).or_insert_with(|| { + compute_target_buckets(pred, bucket_key_fields, total_buckets) }); if let Some(targets) = targets { - if !targets.contains(&entry.bucket()) { + if !targets.contains(&bucket) { return false; } } } + + // Partition filter if let Some(pf) = partition_filter { - match pf.matches_entry(entry.partition()) { + match pf.matches_entry(partition_bytes) { Ok(false) => return false, Ok(true) => {} Err(_) => {} } } + + true + }, + )?; + + // Post-filter: level-0 and data predicates (need DataFileMeta) + let filtered: Vec = entries + .into_iter() + .filter(|entry| { + if skip_level_zero && has_primary_keys && entry.file().level == 0 { + return false; + } if !data_predicates.is_empty() && !data_file_matches_predicates( entry.file(), @@ -186,7 +205,8 @@ fn build_deletion_files_map( use crate::spec::FileKind; let table_path = table_path.trim_end_matches('/'); let index_path_prefix = format!("{table_path}/{INDEX_DIR}"); - let mut map: HashMap> = HashMap::new(); + let mut map: HashMap> = + HashMap::with_capacity(index_entries.len()); for entry in index_entries { if entry.kind != FileKind::Add { continue; @@ -221,21 +241,34 @@ fn build_deletion_files_map( /// The identifier must be rich enough to match Paimon's file identity, otherwise a delete /// for one file version can incorrectly remove another with the same file name. fn merge_manifest_entries(entries: Vec) -> Vec { - let mut deleted_entry_keys = HashSet::new(); - let mut added_entries = Vec::new(); + let mut delete_entries = Vec::with_capacity(entries.len() / 4); + let mut added_entries = Vec::with_capacity(entries.len()); for entry in entries { match entry.kind() { FileKind::Add => added_entries.push(entry), - FileKind::Delete => { - deleted_entry_keys.insert(entry.identifier()); - } + FileKind::Delete => delete_entries.push(entry), } } + if delete_entries.is_empty() { + return added_entries; + } + + let deleted_keys: HashSet<(&[u8], i32, &str)> = delete_entries + .iter() + .map(|e| (e.partition(), e.bucket(), e.file().file_name.as_str())) + .collect(); + added_entries .into_iter() - .filter(|entry| !deleted_entry_keys.contains(&entry.identifier())) + .filter(|entry| { + !deleted_keys.contains(&( + entry.partition(), + entry.bucket(), + entry.file().file_name.as_str(), + )) + }) .collect() } @@ -556,16 +589,20 @@ impl<'a> TableScan<'a> { return Ok(Plan::new(Vec::new())); } - // Group by (partition, bucket). Key = (partition_bytes, bucket). - let mut groups: HashMap<(Vec, i32), Vec> = HashMap::new(); + // Group by (partition, bucket), decomposing entries to avoid cloning partition. + let mut groups: HashMap<(Vec, i32), (i32, Vec)> = + HashMap::with_capacity(entries.len()); for e in entries { - let key = (e.partition().to_vec(), e.bucket()); - groups.entry(key).or_default().push(e); + let (partition, bucket, total_buckets, file) = e.into_parts(); + let entry = groups + .entry((partition, bucket)) + .or_insert_with(|| (total_buckets, Vec::new())); + entry.1.push(file); } let snapshot_id = snapshot.id(); let base_path = table_path.trim_end_matches('/'); - let mut splits = Vec::new(); + let mut splits = Vec::with_capacity(groups.len()); let partition_computer = if !partition_keys.is_empty() { Some(PartitionComputer::new( @@ -610,24 +647,9 @@ impl<'a> TableScan<'a> { (None, self.row_ranges.clone()) }; - for ((partition, bucket), group_entries) in groups { + for ((partition, bucket), (total_buckets, data_files)) in groups { let partition_row = BinaryRow::from_serialized_bytes(&partition)?; - let total_buckets = group_entries - .first() - .map(|e| e.total_buckets()) - .ok_or_else(|| Error::UnexpectedError { - message: format!("Manifest entry group for bucket {bucket} is empty, cannot determine total_buckets"), - source: None, - })?; - let data_files: Vec<_> = group_entries - .into_iter() - .map(|e| { - let ManifestEntry { file, .. } = e; - file - }) - .collect(); - let bucket_path = if let Some(ref computer) = partition_computer { let partition_path = computer.generate_partition_path(&partition_row)?; format!("{base_path}/{partition_path}{}", bucket_dir_name(bucket))