From f8c15ee3d3c5dd991e908155b9334645419ba6e0 Mon Sep 17 00:00:00 2001 From: "xuan.zhao" Date: Thu, 23 Apr 2026 18:16:33 +0800 Subject: [PATCH] feat(puffin): add puffin file reader and writer - PuffinWriter: in-memory writer that builds complete Puffin files - Add() writes blobs with optional compression - Finish() serializes footer with JSON metadata - Tracks BlobMetadata for all written blobs - PuffinReader: in-memory reader that parses Puffin files - ReadFileMetadata() parses footer and validates magic bytes - ReadBlob() reads and decompresses individual blobs - ReadAll() reads all blobs from metadata - Expose Compress/Decompress as public API in puffin_format.h - Register new sources in CMake and Meson build systems - Add comprehensive tests including Java binary compatibility --- src/iceberg/CMakeLists.txt | 2 + src/iceberg/meson.build | 2 + src/iceberg/puffin/meson.build | 8 +- src/iceberg/puffin/puffin_format.cc | 24 +- src/iceberg/puffin/puffin_format.h | 10 + src/iceberg/puffin/puffin_reader.cc | 150 ++++++ src/iceberg/puffin/puffin_reader.h | 66 +++ src/iceberg/puffin/puffin_writer.cc | 124 +++++ src/iceberg/puffin/puffin_writer.h | 80 ++++ src/iceberg/test/CMakeLists.txt | 6 +- src/iceberg/test/meson.build | 6 +- src/iceberg/test/puffin_reader_writer_test.cc | 433 ++++++++++++++++++ 12 files changed, 896 insertions(+), 15 deletions(-) create mode 100644 src/iceberg/puffin/puffin_reader.cc create mode 100644 src/iceberg/puffin/puffin_reader.h create mode 100644 src/iceberg/puffin/puffin_writer.cc create mode 100644 src/iceberg/puffin/puffin_writer.h create mode 100644 src/iceberg/test/puffin_reader_writer_test.cc diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index 603c35343..2f99eeeb1 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -68,6 +68,8 @@ set(ICEBERG_SOURCES partition_summary.cc puffin/file_metadata.cc puffin/puffin_format.cc + puffin/puffin_reader.cc + puffin/puffin_writer.cc puffin/json_serde.cc row/arrow_array_wrapper.cc row/manifest_wrapper.cc diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build index 5e68def98..b55f94b2a 100644 --- a/src/iceberg/meson.build +++ b/src/iceberg/meson.build @@ -91,6 +91,8 @@ iceberg_sources = files( 'puffin/file_metadata.cc', 'puffin/json_serde.cc', 'puffin/puffin_format.cc', + 'puffin/puffin_reader.cc', + 'puffin/puffin_writer.cc', 'row/arrow_array_wrapper.cc', 'row/manifest_wrapper.cc', 'row/partition_values.cc', diff --git a/src/iceberg/puffin/meson.build b/src/iceberg/puffin/meson.build index 7869d7b2c..7f30468db 100644 --- a/src/iceberg/puffin/meson.build +++ b/src/iceberg/puffin/meson.build @@ -16,6 +16,12 @@ # under the License. install_headers( - ['file_metadata.h', 'puffin_format.h', 'type_fwd.h'], + [ + 'file_metadata.h', + 'puffin_format.h', + 'puffin_reader.h', + 'puffin_writer.h', + 'type_fwd.h', + ], subdir: 'iceberg/puffin', ) diff --git a/src/iceberg/puffin/puffin_format.cc b/src/iceberg/puffin/puffin_format.cc index 88807d0ca..88d378f04 100644 --- a/src/iceberg/puffin/puffin_format.cc +++ b/src/iceberg/puffin/puffin_format.cc @@ -36,6 +36,18 @@ constexpr std::pair GetFlagPosition(PuffinFlag flag) { std::unreachable(); } +} // namespace + +bool IsFlagSet(std::span flags, PuffinFlag flag) { + auto [byte_num, bit_num] = GetFlagPosition(flag); + return (flags[byte_num] & (1 << bit_num)) != 0; +} + +void SetFlag(std::span flags, PuffinFlag flag) { + auto [byte_num, bit_num] = GetFlagPosition(flag); + flags[byte_num] |= (1 << bit_num); +} + // TODO(zhaoxuan1994): Move compression logic to a unified codec interface. Result> Compress(PuffinCompressionCodec codec, std::span input) { @@ -63,16 +75,4 @@ Result> Decompress(PuffinCompressionCodec codec, std::unreachable(); } -} // namespace - -bool IsFlagSet(std::span flags, PuffinFlag flag) { - auto [byte_num, bit_num] = GetFlagPosition(flag); - return (flags[byte_num] & (1 << bit_num)) != 0; -} - -void SetFlag(std::span flags, PuffinFlag flag) { - auto [byte_num, bit_num] = GetFlagPosition(flag); - flags[byte_num] |= (1 << bit_num); -} - } // namespace iceberg::puffin diff --git a/src/iceberg/puffin/puffin_format.h b/src/iceberg/puffin/puffin_format.h index 857c2ba57..05a27145e 100644 --- a/src/iceberg/puffin/puffin_format.h +++ b/src/iceberg/puffin/puffin_format.h @@ -23,8 +23,10 @@ /// Puffin file format constants and utilities. #include +#include #include #include +#include #include "iceberg/iceberg_export.h" #include "iceberg/puffin/file_metadata.h" @@ -66,4 +68,12 @@ ICEBERG_EXPORT bool IsFlagSet(std::span flags, PuffinFlag flag /// \brief Set a flag in the flags bytes. ICEBERG_EXPORT void SetFlag(std::span flags, PuffinFlag flag); +/// \brief Compress data using the specified codec. +ICEBERG_EXPORT Result> Compress(PuffinCompressionCodec codec, + std::span input); + +/// \brief Decompress data using the specified codec. +ICEBERG_EXPORT Result> Decompress( + PuffinCompressionCodec codec, std::span input); + } // namespace iceberg::puffin diff --git a/src/iceberg/puffin/puffin_reader.cc b/src/iceberg/puffin/puffin_reader.cc new file mode 100644 index 000000000..16e3cdb1b --- /dev/null +++ b/src/iceberg/puffin/puffin_reader.cc @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/puffin/puffin_reader.h" + +#include +#include +#include +#include + +#include "iceberg/puffin/json_serde_internal.h" +#include "iceberg/puffin/puffin_format.h" +#include "iceberg/util/endian.h" +#include "iceberg/util/macros.h" + +namespace iceberg::puffin { + +namespace { + +// Validate magic bytes at the given offset. +Status CheckMagic(std::span data, int64_t offset) { + if (offset < 0 || + offset + PuffinFormat::kMagicLength > static_cast(data.size())) { + return Invalid("Invalid file: cannot read magic at offset {}", offset); + } + auto* begin = reinterpret_cast(data.data() + offset); + if (!std::equal(PuffinFormat::kMagicV1.begin(), PuffinFormat::kMagicV1.end(), begin)) { + return Invalid("Invalid file: expected magic at offset {}", offset); + } + return {}; +} + +} // namespace + +PuffinReader::PuffinReader(std::span data) : data_(data) {} + +Result PuffinReader::ReadFileMetadata() { + auto file_size = static_cast(data_.size()); + + if (file_size < PuffinFormat::kFooterStructLength) { + return Invalid("Invalid file: file length {} is less than minimal footer size {}", + file_size, PuffinFormat::kFooterStructLength); + } + + // Read footer struct from end of file + auto footer_struct_offset = file_size - PuffinFormat::kFooterStructLength; + + // Validate footer end magic + ICEBERG_RETURN_UNEXPECTED( + CheckMagic(data_, footer_struct_offset + PuffinFormat::kFooterStructMagicOffset)); + + // Read payload size from footer struct + auto payload_size = ReadLittleEndian( + data_.data() + footer_struct_offset + PuffinFormat::kFooterStructPayloadSizeOffset); + + if (payload_size < 0) { + return Invalid("Invalid file: negative payload size {}", payload_size); + } + + // Calculate total footer size and validate + int64_t footer_size = PuffinFormat::kFooterStartMagicLength + + static_cast(payload_size) + + PuffinFormat::kFooterStructLength; + auto footer_offset = file_size - footer_size; + if (footer_offset < 0) { + return Invalid("Invalid file: footer size {} exceeds file size {}", footer_size, + file_size); + } + + // Validate footer start magic + ICEBERG_RETURN_UNEXPECTED(CheckMagic(data_, footer_offset)); + + // Check flags for footer compression + std::array flags{}; + std::memcpy( + flags.data(), + data_.data() + footer_struct_offset + PuffinFormat::kFooterStructFlagsOffset, 4); + + PuffinCompressionCodec footer_compression = PuffinCompressionCodec::kNone; + if (IsFlagSet(flags, PuffinFlag::kFooterPayloadCompressed)) { + footer_compression = PuffinFormat::kDefaultFooterCompressionCodec; + } + + // Extract footer payload + auto payload_offset = footer_offset + PuffinFormat::kFooterStartMagicLength; + std::span payload_span(data_.data() + payload_offset, payload_size); + ICEBERG_ASSIGN_OR_RAISE(auto payload_bytes, + Decompress(footer_compression, payload_span)); + + // Parse JSON + std::string_view json_str(reinterpret_cast(payload_bytes.data()), + payload_bytes.size()); + ICEBERG_ASSIGN_OR_RAISE(auto file_metadata, FileMetadataFromJsonString(json_str)); + + // Validate header magic + ICEBERG_RETURN_UNEXPECTED(CheckMagic(data_, 0)); + + return file_metadata; +} + +Result>> PuffinReader::ReadBlob( + const BlobMetadata& blob_metadata) { + auto file_size = static_cast(data_.size()); + + if (blob_metadata.offset < 0 || blob_metadata.length < 0 || + blob_metadata.offset > file_size || + blob_metadata.length > file_size - blob_metadata.offset) { + return Invalid("Invalid blob: offset {} + length {} exceeds file size {}", + blob_metadata.offset, blob_metadata.length, file_size); + } + + std::span raw_data(data_.data() + blob_metadata.offset, + blob_metadata.length); + + // Determine compression codec + ICEBERG_ASSIGN_OR_RAISE( + auto codec, PuffinCompressionCodecFromName(blob_metadata.compression_codec)); + ICEBERG_ASSIGN_OR_RAISE(auto decompressed, Decompress(codec, raw_data)); + + return std::pair{blob_metadata, std::move(decompressed)}; +} + +Result>>> +PuffinReader::ReadAll(const std::vector& blobs) { + std::vector>> results; + results.reserve(blobs.size()); + for (const auto& blob : blobs) { + ICEBERG_ASSIGN_OR_RAISE(auto blob_pair, ReadBlob(blob)); + results.push_back(std::move(blob_pair)); + } + return results; +} + +} // namespace iceberg::puffin diff --git a/src/iceberg/puffin/puffin_reader.h b/src/iceberg/puffin/puffin_reader.h new file mode 100644 index 000000000..2a7ed5ec0 --- /dev/null +++ b/src/iceberg/puffin/puffin_reader.h @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/puffin/puffin_reader.h +/// Puffin file reader. + +#include +#include +#include +#include +#include + +#include "iceberg/iceberg_export.h" +#include "iceberg/puffin/file_metadata.h" +#include "iceberg/result.h" + +namespace iceberg::puffin { + +/// \brief Reader for Puffin files. +/// +/// Parses a Puffin file from an in-memory buffer. Usage: +/// PuffinReader reader(file_data); +/// auto metadata = reader.ReadFileMetadata(); +/// auto blob = reader.ReadBlob(metadata.value().blobs[0]); +class ICEBERG_EXPORT PuffinReader { + public: + /// \brief Construct a reader from file data. + explicit PuffinReader(std::span data); + + /// \brief Read and return the file metadata from the footer. + Result ReadFileMetadata(); + + /// \brief Read a specific blob's data by its metadata. + /// \param blob_metadata The metadata describing the blob to read. + /// \return A pair of (BlobMetadata, decompressed data), or an error. + Result>> ReadBlob( + const BlobMetadata& blob_metadata); + + /// \brief Read all blobs described in the file metadata. + /// \return A vector of (BlobMetadata, decompressed data) pairs, or an error. + Result>>> ReadAll( + const std::vector& blobs); + + private: + std::span data_; +}; + +} // namespace iceberg::puffin diff --git a/src/iceberg/puffin/puffin_writer.cc b/src/iceberg/puffin/puffin_writer.cc new file mode 100644 index 000000000..7d2a978e4 --- /dev/null +++ b/src/iceberg/puffin/puffin_writer.cc @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/puffin/puffin_writer.h" + +#include + +#include "iceberg/puffin/json_serde_internal.h" +#include "iceberg/puffin/puffin_format.h" +#include "iceberg/util/endian.h" +#include "iceberg/util/macros.h" + +namespace iceberg::puffin { + +PuffinWriter::PuffinWriter(PuffinCompressionCodec default_codec) + : default_codec_(default_codec) {} + +void PuffinWriter::WriteHeader() { + if (header_written_) return; + const auto& magic = PuffinFormat::kMagicV1; + buffer_.insert(buffer_.end(), reinterpret_cast(magic.data()), + reinterpret_cast(magic.data() + magic.size())); + header_written_ = true; +} + +Result PuffinWriter::Add(const Blob& blob) { + if (finished_) { + return Invalid("Writer already finished"); + } + + WriteHeader(); + + auto codec = blob.requested_compression.value_or(default_codec_); + std::span input_span( + reinterpret_cast(blob.data.data()), blob.data.size()); + ICEBERG_ASSIGN_OR_RAISE(auto compressed, Compress(codec, input_span)); + + auto offset = static_cast(buffer_.size()); + auto length = static_cast(compressed.size()); + buffer_.insert(buffer_.end(), compressed.begin(), compressed.end()); + + auto codec_name = CodecName(codec); + BlobMetadata metadata{ + .type = blob.type, + .input_fields = blob.input_fields, + .snapshot_id = blob.snapshot_id, + .sequence_number = blob.sequence_number, + .offset = offset, + .length = length, + .compression_codec = std::string(codec_name), + .properties = blob.properties, + }; + written_blobs_metadata_.push_back(metadata); + return metadata; +} + +Result> PuffinWriter::Finish( + std::unordered_map properties) { + if (finished_) { + return Invalid("Writer already finished"); + } + + WriteHeader(); + + FileMetadata file_metadata{ + .blobs = written_blobs_metadata_, + .properties = std::move(properties), + }; + + auto footer_json = ToJsonString(file_metadata); + auto footer_payload = std::span( + reinterpret_cast(footer_json.data()), footer_json.size()); + + // Footer start magic + auto footer_start = static_cast(buffer_.size()); + const auto& magic = PuffinFormat::kMagicV1; + buffer_.insert(buffer_.end(), reinterpret_cast(magic.data()), + reinterpret_cast(magic.data() + magic.size())); + + // Footer payload + buffer_.insert(buffer_.end(), footer_payload.begin(), footer_payload.end()); + + // Footer struct: payload_size (4) + flags (4) + magic (4) + auto payload_size = static_cast(footer_payload.size()); + std::array size_buf{}; + WriteLittleEndian(payload_size, size_buf.data()); + buffer_.insert(buffer_.end(), size_buf.begin(), size_buf.end()); + + // Flags (no compression for now) + std::array flags{}; + buffer_.insert(buffer_.end(), flags.begin(), flags.end()); + + // Footer end magic + buffer_.insert(buffer_.end(), reinterpret_cast(magic.data()), + reinterpret_cast(magic.data() + magic.size())); + + footer_size_ = static_cast(buffer_.size()) - footer_start; + finished_ = true; + return std::move(buffer_); +} + +const std::vector& PuffinWriter::written_blobs_metadata() const { + return written_blobs_metadata_; +} + +std::optional PuffinWriter::footer_size() const { return footer_size_; } + +} // namespace iceberg::puffin diff --git a/src/iceberg/puffin/puffin_writer.h b/src/iceberg/puffin/puffin_writer.h new file mode 100644 index 000000000..7930ca210 --- /dev/null +++ b/src/iceberg/puffin/puffin_writer.h @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/puffin/puffin_writer.h +/// Puffin file writer. + +#include +#include +#include +#include +#include +#include + +#include "iceberg/iceberg_export.h" +#include "iceberg/puffin/file_metadata.h" +#include "iceberg/result.h" + +namespace iceberg::puffin { + +/// \brief Writer for Puffin files. +/// +/// Builds a complete Puffin file in memory. Usage: +/// PuffinWriter writer; +/// writer.Add(blob1); +/// writer.Add(blob2); +/// auto result = writer.Finish({{"created-by", "iceberg-cpp"}}); +/// // result.value() contains the serialized file bytes +class ICEBERG_EXPORT PuffinWriter { + public: + /// \brief Construct a writer with the given default compression codec. + explicit PuffinWriter( + PuffinCompressionCodec default_codec = PuffinCompressionCodec::kNone); + + /// \brief Add a blob to be written. + /// \return The BlobMetadata for the written blob, or an error. + Result Add(const Blob& blob); + + /// \brief Finalize the file and return the serialized bytes. + /// \param properties File-level properties to include in the footer. + /// \return The complete Puffin file as a byte vector, or an error. + Result> Finish( + std::unordered_map properties = {}); + + /// \brief Get metadata for all blobs written so far. + const std::vector& written_blobs_metadata() const; + + /// \brief Get the footer size after Finish() has been called. + /// \return The footer size, or std::nullopt if Finish() has not been called. + std::optional footer_size() const; + + private: + PuffinCompressionCodec default_codec_; + std::vector buffer_; + std::vector written_blobs_metadata_; + bool header_written_ = false; + bool finished_ = false; + std::optional footer_size_; + + void WriteHeader(); +}; + +} // namespace iceberg::puffin diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt index 92fd0c1c2..ea4d40895 100644 --- a/src/iceberg/test/CMakeLists.txt +++ b/src/iceberg/test/CMakeLists.txt @@ -129,7 +129,11 @@ add_iceberg_test(util_test add_iceberg_test(roaring_test SOURCES roaring_test.cc) -add_iceberg_test(puffin_test SOURCES puffin_format_test.cc puffin_json_test.cc) +add_iceberg_test(puffin_test + SOURCES + puffin_format_test.cc + puffin_json_test.cc + puffin_reader_writer_test.cc) if(ICEBERG_BUILD_BUNDLE) add_iceberg_test(avro_test diff --git a/src/iceberg/test/meson.build b/src/iceberg/test/meson.build index f3eeb7da5..f1106de41 100644 --- a/src/iceberg/test/meson.build +++ b/src/iceberg/test/meson.build @@ -103,7 +103,11 @@ iceberg_tests = { }, 'roaring_test': {'sources': files('roaring_test.cc')}, 'puffin_test': { - 'sources': files('puffin_format_test.cc', 'puffin_json_test.cc'), + 'sources': files( + 'puffin_format_test.cc', + 'puffin_json_test.cc', + 'puffin_reader_writer_test.cc', + ), }, } diff --git a/src/iceberg/test/puffin_reader_writer_test.cc b/src/iceberg/test/puffin_reader_writer_test.cc new file mode 100644 index 000000000..f0b4d31c9 --- /dev/null +++ b/src/iceberg/test/puffin_reader_writer_test.cc @@ -0,0 +1,433 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include +#include +#include +#include + +#include + +#include "iceberg/puffin/file_metadata.h" +#include "iceberg/puffin/puffin_reader.h" +#include "iceberg/puffin/puffin_writer.h" +#include "iceberg/test/matchers.h" + +namespace iceberg::puffin { + +namespace { + +std::vector ToBytes(std::initializer_list values) { + std::vector result; + result.reserve(values.size()); + for (auto v : values) { + result.push_back(static_cast(v)); + } + return result; +} + +std::vector ToBytes(std::string_view str) { + return {reinterpret_cast(str.data()), + reinterpret_cast(str.data() + str.size())}; +} + +} // namespace + +// ============================================================================ +// PuffinWriter Tests +// ============================================================================ + +TEST(PuffinWriterTest, WriteEmptyFile) { + PuffinWriter writer; + auto result = writer.Finish(); + ASSERT_THAT(result, IsOk()); + auto& data = result.value(); + + // Header magic (4) + footer start magic (4) + JSON payload + footer struct (12) + EXPECT_GE(data.size(), 20u); + // Header magic + EXPECT_EQ(data[0], std::byte{0x50}); + EXPECT_EQ(data[1], std::byte{0x46}); + EXPECT_EQ(data[2], std::byte{0x41}); + EXPECT_EQ(data[3], std::byte{0x31}); + // Footer end magic + auto sz = data.size(); + EXPECT_EQ(data[sz - 4], std::byte{0x50}); + EXPECT_EQ(data[sz - 3], std::byte{0x46}); + EXPECT_EQ(data[sz - 2], std::byte{0x41}); + EXPECT_EQ(data[sz - 1], std::byte{0x31}); + + EXPECT_TRUE(writer.written_blobs_metadata().empty()); + ASSERT_TRUE(writer.footer_size().has_value()); +} + +TEST(PuffinWriterTest, WriterRejectsAfterFinish) { + PuffinWriter writer; + ASSERT_THAT(writer.Finish(), IsOk()); + + // Double finish + EXPECT_THAT(writer.Finish(), IsError(ErrorKind::kInvalid)); + + // Add after finish + Blob blob{.type = "a", .snapshot_id = 1, .sequence_number = 0}; + EXPECT_THAT(writer.Add(blob), IsError(ErrorKind::kInvalid)); +} + +TEST(PuffinWriterTest, WriteEmptyBlobData) { + PuffinWriter writer; + Blob blob{ + .type = "empty-blob", + .input_fields = {1}, + .snapshot_id = 1, + .sequence_number = 0, + .data = {}, + }; + auto meta = writer.Add(blob); + ASSERT_THAT(meta, IsOk()); + EXPECT_EQ(meta.value().offset, 4); + EXPECT_EQ(meta.value().length, 0); + + auto result = writer.Finish(); + ASSERT_THAT(result, IsOk()); + + PuffinReader reader(result.value()); + auto fm = reader.ReadFileMetadata(); + ASSERT_THAT(fm, IsOk()); + ASSERT_EQ(fm.value().blobs.size(), 1); + + auto blob_result = reader.ReadBlob(fm.value().blobs[0]); + ASSERT_THAT(blob_result, IsOk()); + EXPECT_TRUE(blob_result.value().second.empty()); +} + +TEST(PuffinWriterTest, WriteLargeBlob) { + PuffinWriter writer; + std::vector large_data(4096); + for (size_t i = 0; i < large_data.size(); ++i) { + large_data[i] = static_cast(i & 0xFF); + } + Blob blob{ + .type = "large-blob", + .input_fields = {1, 2, 3}, + .snapshot_id = 999, + .sequence_number = 42, + .data = large_data, + }; + auto meta = writer.Add(blob); + ASSERT_THAT(meta, IsOk()); + EXPECT_EQ(meta.value().length, 4096); + + auto result = writer.Finish(); + ASSERT_THAT(result, IsOk()); + + PuffinReader reader(result.value()); + auto fm = reader.ReadFileMetadata(); + ASSERT_THAT(fm, IsOk()); + ASSERT_EQ(fm.value().blobs.size(), 1); + + auto blob_result = reader.ReadBlob(fm.value().blobs[0]); + ASSERT_THAT(blob_result, IsOk()); + auto& read_data = blob_result.value().second; + ASSERT_EQ(read_data.size(), 4096); + for (size_t i = 0; i < read_data.size(); ++i) { + EXPECT_EQ(read_data[i], static_cast(i & 0xFF)) + << "mismatch at index " << i; + } +} + +// ============================================================================ +// Round-Trip Tests +// ============================================================================ + +TEST(PuffinRoundTripTest, SingleBlob) { + PuffinWriter writer; + EXPECT_FALSE(writer.footer_size().has_value()); + + std::vector blob_data = {0x01, 0x02, 0x03, 0x04, 0x05}; + Blob blob{ + .type = "test-blob", + .input_fields = {1, 2}, + .snapshot_id = 42, + .sequence_number = 7, + .data = blob_data, + }; + auto meta = writer.Add(blob); + ASSERT_THAT(meta, IsOk()); + EXPECT_EQ(meta.value().type, "test-blob"); + EXPECT_EQ(meta.value().offset, 4); // after header magic + EXPECT_EQ(meta.value().length, 5); + EXPECT_EQ(writer.written_blobs_metadata().size(), 1); + + auto file_result = writer.Finish({{"created-by", "test"}}); + ASSERT_THAT(file_result, IsOk()); + ASSERT_TRUE(writer.footer_size().has_value()); + EXPECT_GT(writer.footer_size().value(), 0); + + PuffinReader reader(file_result.value()); + auto fm = reader.ReadFileMetadata(); + ASSERT_THAT(fm, IsOk()); + ASSERT_EQ(fm.value().blobs.size(), 1); + EXPECT_EQ(fm.value().blobs[0].type, "test-blob"); + EXPECT_EQ(fm.value().properties.at("created-by"), "test"); + + auto blob_result = reader.ReadBlob(fm.value().blobs[0]); + ASSERT_THAT(blob_result, IsOk()); + EXPECT_EQ(blob_result.value().second, ToBytes({0x01, 0x02, 0x03, 0x04, 0x05})); +} + +TEST(PuffinRoundTripTest, MultipleBlobs) { + PuffinWriter writer; + EXPECT_TRUE(writer.written_blobs_metadata().empty()); + + // Add first blob (no properties) + std::vector data1 = {'a', 'b', 'c'}; + ASSERT_THAT(writer.Add(Blob{.type = "first", + .input_fields = {1}, + .snapshot_id = 1, + .sequence_number = 0, + .data = data1}), + IsOk()); + EXPECT_EQ(writer.written_blobs_metadata().size(), 1); + + // Add second blob (with properties) + std::vector data2 = {'d', 'e', 'f', 'g'}; + auto meta2 = writer.Add(Blob{.type = "second", + .input_fields = {2}, + .snapshot_id = 2, + .sequence_number = 1, + .data = data2, + .properties = {{"key", "val"}}}); + ASSERT_THAT(meta2, IsOk()); + // Second blob starts after header (4) + first blob (3) + EXPECT_EQ(meta2.value().offset, 7); + EXPECT_EQ(meta2.value().length, 4); + EXPECT_EQ(writer.written_blobs_metadata().size(), 2); + + EXPECT_FALSE(writer.footer_size().has_value()); + auto file_result = writer.Finish(); + ASSERT_THAT(file_result, IsOk()); + ASSERT_TRUE(writer.footer_size().has_value()); + EXPECT_GT(writer.footer_size().value(), 0); + + // Read back + PuffinReader reader(file_result.value()); + auto fm = reader.ReadFileMetadata(); + ASSERT_THAT(fm, IsOk()); + ASSERT_EQ(fm.value().blobs.size(), 2); + + // Verify properties: first has none, second has one + EXPECT_TRUE(fm.value().blobs[0].properties.empty()); + EXPECT_EQ(fm.value().blobs[1].properties.at("key"), "val"); + + // ReadAll + auto all = reader.ReadAll(fm.value().blobs); + ASSERT_THAT(all, IsOk()); + ASSERT_EQ(all.value().size(), 2); + EXPECT_EQ(all.value()[0].second, ToBytes("abc")); + EXPECT_EQ(all.value()[1].second, ToBytes("defg")); +} + +TEST(PuffinRoundTripTest, WithProperties) { + PuffinWriter writer; + std::string text = "hello puffin"; + std::vector blob_data(text.begin(), text.end()); + ASSERT_THAT(writer.Add(Blob{.type = "text-blob", + .input_fields = {1}, + .snapshot_id = 100, + .sequence_number = 5, + .data = blob_data, + .properties = {{"encoding", "utf-8"}}}), + IsOk()); + auto file_result = writer.Finish({{"created-by", "iceberg-cpp-test"}}); + ASSERT_THAT(file_result, IsOk()); + + PuffinReader reader(file_result.value()); + auto fm = reader.ReadFileMetadata(); + ASSERT_THAT(fm, IsOk()); + EXPECT_EQ(fm.value().properties.at("created-by"), "iceberg-cpp-test"); + ASSERT_EQ(fm.value().blobs.size(), 1); + EXPECT_EQ(fm.value().blobs[0].type, "text-blob"); + EXPECT_EQ(fm.value().blobs[0].properties.at("encoding"), "utf-8"); + + auto blob_result = reader.ReadBlob(fm.value().blobs[0]); + ASSERT_THAT(blob_result, IsOk()); + EXPECT_EQ(blob_result.value().second, ToBytes("hello puffin")); +} + +// ============================================================================ +// PuffinReader Error Tests +// ============================================================================ + +TEST(PuffinReaderTest, ReadEmptyFile) { + PuffinWriter writer; + auto result = writer.Finish(); + ASSERT_THAT(result, IsOk()); + + PuffinReader reader(result.value()); + auto fm = reader.ReadFileMetadata(); + ASSERT_THAT(fm, IsOk()); + EXPECT_TRUE(fm.value().blobs.empty()); + EXPECT_TRUE(fm.value().properties.empty()); +} + +TEST(PuffinReaderTest, InvalidMagic) { + auto bad_data = ToBytes({0x00, 0x00, 0x00, 0x00}); + PuffinReader reader(bad_data); + EXPECT_THAT(reader.ReadFileMetadata(), IsError(ErrorKind::kInvalid)); +} + +TEST(PuffinReaderTest, TruncatedFile) { + auto tiny = ToBytes({0x50, 0x46}); + PuffinReader reader(tiny); + EXPECT_THAT(reader.ReadFileMetadata(), IsError(ErrorKind::kInvalid)); +} + +TEST(PuffinReaderTest, InvalidBlobOffset) { + PuffinWriter writer; + auto file_result = writer.Finish(); + ASSERT_THAT(file_result, IsOk()); + + PuffinReader reader(file_result.value()); + BlobMetadata bad_meta{ + .type = "bad", + .snapshot_id = 1, + .sequence_number = 0, + .offset = 9999, + .length = 100, + }; + EXPECT_THAT(reader.ReadBlob(bad_meta), IsError(ErrorKind::kInvalid)); +} + +// ============================================================================ +// Java Binary Compatibility Tests +// ============================================================================ + +TEST(PuffinReaderTest, JavaEmptyPuffinCompatibility) { + // Exact binary content of v1/empty-puffin-uncompressed.bin from Java test resources + auto java_empty = ToBytes({ + 0x50, 0x46, 0x41, 0x31, // header magic + 0x50, 0x46, 0x41, 0x31, // footer start magic + 0x7b, 0x22, 0x62, 0x6c, 0x6f, 0x62, + 0x73, 0x22, 0x3a, 0x5b, 0x5d, 0x7d, // {"blobs":[]} + 0x0c, 0x00, 0x00, 0x00, // payload size = 12 + 0x00, 0x00, 0x00, 0x00, // flags = 0 + 0x50, 0x46, 0x41, 0x31, // footer end magic + }); + + PuffinReader reader(java_empty); + auto fm = reader.ReadFileMetadata(); + ASSERT_THAT(fm, IsOk()); + EXPECT_TRUE(fm.value().blobs.empty()); + EXPECT_TRUE(fm.value().properties.empty()); +} + +// Verify binary compatibility with Java's sample-metric-data-uncompressed.bin. +// This file contains two blobs: "abcdefghi" (9 bytes) and binary data including +// a null byte and emoji (83 bytes). +TEST(PuffinReaderTest, JavaSampleMetricDataCompatibility) { + // clang-format off + auto java_sample = ToBytes({ + // Header magic + 0x50, 0x46, 0x41, 0x31, + // Blob 1: "abcdefghi" (9 bytes, offset=4) + 0x61, 0x62, 0x63, 0x64, 0x65, 0x66, 0x67, 0x68, 0x69, + // Blob 2: "some blob \0 binary data 🤯 that is not very very very very + // very very long, is it?" (83 bytes, offset=13) + 0x73, 0x6f, 0x6d, 0x65, 0x20, 0x62, 0x6c, 0x6f, 0x62, 0x20, 0x00, 0x20, + 0x62, 0x69, 0x6e, 0x61, 0x72, 0x79, 0x20, 0x64, 0x61, 0x74, 0x61, 0x20, + 0xf0, 0x9f, 0xa4, 0xaf, 0x20, 0x74, 0x68, 0x61, 0x74, 0x20, 0x69, 0x73, + 0x20, 0x6e, 0x6f, 0x74, 0x20, 0x76, 0x65, 0x72, 0x79, 0x20, 0x76, 0x65, + 0x72, 0x79, 0x20, 0x76, 0x65, 0x72, 0x79, 0x20, 0x76, 0x65, 0x72, 0x79, + 0x20, 0x76, 0x65, 0x72, 0x79, 0x20, 0x76, 0x65, 0x72, 0x79, 0x20, 0x6c, + 0x6f, 0x6e, 0x67, 0x2c, 0x20, 0x69, 0x73, 0x20, 0x69, 0x74, 0x3f, + // Footer start magic + 0x50, 0x46, 0x41, 0x31, + // Footer payload (JSON) + 0x7b, 0x22, 0x62, 0x6c, 0x6f, 0x62, 0x73, 0x22, 0x3a, 0x5b, 0x7b, 0x22, + 0x74, 0x79, 0x70, 0x65, 0x22, 0x3a, 0x22, 0x73, 0x6f, 0x6d, 0x65, 0x2d, + 0x62, 0x6c, 0x6f, 0x62, 0x22, 0x2c, 0x22, 0x66, 0x69, 0x65, 0x6c, 0x64, + 0x73, 0x22, 0x3a, 0x5b, 0x31, 0x5d, 0x2c, 0x22, 0x73, 0x6e, 0x61, 0x70, + 0x73, 0x68, 0x6f, 0x74, 0x2d, 0x69, 0x64, 0x22, 0x3a, 0x32, 0x2c, 0x22, + 0x73, 0x65, 0x71, 0x75, 0x65, 0x6e, 0x63, 0x65, 0x2d, 0x6e, 0x75, 0x6d, + 0x62, 0x65, 0x72, 0x22, 0x3a, 0x31, 0x2c, 0x22, 0x6f, 0x66, 0x66, 0x73, + 0x65, 0x74, 0x22, 0x3a, 0x34, 0x2c, 0x22, 0x6c, 0x65, 0x6e, 0x67, 0x74, + 0x68, 0x22, 0x3a, 0x39, 0x7d, 0x2c, 0x7b, 0x22, 0x74, 0x79, 0x70, 0x65, + 0x22, 0x3a, 0x22, 0x73, 0x6f, 0x6d, 0x65, 0x2d, 0x6f, 0x74, 0x68, 0x65, + 0x72, 0x2d, 0x62, 0x6c, 0x6f, 0x62, 0x22, 0x2c, 0x22, 0x66, 0x69, 0x65, + 0x6c, 0x64, 0x73, 0x22, 0x3a, 0x5b, 0x32, 0x5d, 0x2c, 0x22, 0x73, 0x6e, + 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x2d, 0x69, 0x64, 0x22, 0x3a, 0x32, + 0x2c, 0x22, 0x73, 0x65, 0x71, 0x75, 0x65, 0x6e, 0x63, 0x65, 0x2d, 0x6e, + 0x75, 0x6d, 0x62, 0x65, 0x72, 0x22, 0x3a, 0x31, 0x2c, 0x22, 0x6f, 0x66, + 0x66, 0x73, 0x65, 0x74, 0x22, 0x3a, 0x31, 0x33, 0x2c, 0x22, 0x6c, 0x65, + 0x6e, 0x67, 0x74, 0x68, 0x22, 0x3a, 0x38, 0x33, 0x7d, 0x5d, 0x2c, 0x22, + 0x70, 0x72, 0x6f, 0x70, 0x65, 0x72, 0x74, 0x69, 0x65, 0x73, 0x22, 0x3a, + 0x7b, 0x22, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x64, 0x2d, 0x62, 0x79, + 0x22, 0x3a, 0x22, 0x54, 0x65, 0x73, 0x74, 0x20, 0x31, 0x32, 0x33, 0x34, + 0x22, 0x7d, 0x7d, + // Footer struct: payload_size (243 = 0xf3) + flags (0) + magic + 0xf3, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, + 0x50, 0x46, 0x41, 0x31, + }); + // clang-format on + + PuffinReader reader(java_sample); + auto fm = reader.ReadFileMetadata(); + ASSERT_THAT(fm, IsOk()); + ASSERT_EQ(fm.value().blobs.size(), 2); + EXPECT_EQ(fm.value().properties.at("created-by"), "Test 1234"); + + // Blob 1: "some-blob", fields=[1], snapshot=2, seq=1 + EXPECT_EQ(fm.value().blobs[0].type, "some-blob"); + EXPECT_EQ(fm.value().blobs[0].input_fields, std::vector{1}); + EXPECT_EQ(fm.value().blobs[0].snapshot_id, 2); + EXPECT_EQ(fm.value().blobs[0].sequence_number, 1); + EXPECT_EQ(fm.value().blobs[0].offset, 4); + EXPECT_EQ(fm.value().blobs[0].length, 9); + + // Blob 2: "some-other-blob", fields=[2], snapshot=2, seq=1 + EXPECT_EQ(fm.value().blobs[1].type, "some-other-blob"); + EXPECT_EQ(fm.value().blobs[1].input_fields, std::vector{2}); + EXPECT_EQ(fm.value().blobs[1].offset, 13); + EXPECT_EQ(fm.value().blobs[1].length, 83); + + // Read blob 1 data: "abcdefghi" + auto blob1 = reader.ReadBlob(fm.value().blobs[0]); + ASSERT_THAT(blob1, IsOk()); + EXPECT_EQ(blob1.value().second, ToBytes("abcdefghi")); + + // Read blob 2 data: contains null byte and emoji + auto blob2 = reader.ReadBlob(fm.value().blobs[1]); + ASSERT_THAT(blob2, IsOk()); + EXPECT_EQ(blob2.value().second.size(), 83); + // Verify null byte at position 10 + EXPECT_EQ(blob2.value().second[10], std::byte{0x00}); + // Verify emoji 🤯 (U+1F92F) at positions 24-27 + EXPECT_EQ(blob2.value().second[24], std::byte{0xf0}); + EXPECT_EQ(blob2.value().second[25], std::byte{0x9f}); + EXPECT_EQ(blob2.value().second[26], std::byte{0xa4}); + EXPECT_EQ(blob2.value().second[27], std::byte{0xaf}); + + // ReadAll should return both blobs + auto all = reader.ReadAll(fm.value().blobs); + ASSERT_THAT(all, IsOk()); + ASSERT_EQ(all.value().size(), 2); +} + +} // namespace iceberg::puffin