From 6f540f429b9124c89d59b71cc6dea2c36c48ac13 Mon Sep 17 00:00:00 2001 From: Neelesh Salian Date: Tue, 14 Apr 2026 21:40:14 -0700 Subject: [PATCH 1/3] Fix thread-safety in Variant lazy caches and add comments Co-authored-by: Steve Loughran --- .../org/apache/parquet/variant/Variant.java | 188 +++++++++++------- .../parquet/variant/VariantBuilder.java | 20 +- .../parquet/variant/VariantConverters.java | 4 +- 3 files changed, 140 insertions(+), 72 deletions(-) diff --git a/parquet-variant/src/main/java/org/apache/parquet/variant/Variant.java b/parquet-variant/src/main/java/org/apache/parquet/variant/Variant.java index 0ccb8359ba..bcdf9711a1 100644 --- a/parquet-variant/src/main/java/org/apache/parquet/variant/Variant.java +++ b/parquet-variant/src/main/java/org/apache/parquet/variant/Variant.java @@ -36,6 +36,26 @@ public final class Variant { */ final ByteBuffer metadata; + /** + * Pre-computed metadata dictionary size + */ + private final int dictSize; + + /** + * Lazy cache for metadata dictionary strings. + */ + private volatile String[] metadataCache; + + /** + * Lazy cache for the parsed object header. + */ + private volatile VariantUtil.ObjectInfo cachedObjectInfo; + + /** + * Lazy cache for the parsed array header. + */ + private volatile VariantUtil.ArrayInfo cachedArrayInfo; + /** * The threshold to switch from linear search to binary search when looking up a field by key in * an object. This is a performance optimization to avoid the overhead of binary search for a @@ -67,6 +87,26 @@ public Variant(ByteBuffer value, ByteBuffer metadata) { "Unsupported variant metadata version: %d", metadata.get(metadata.position()) & VariantUtil.VERSION_MASK)); } + + // Pre-compute dictionary size for lazy metadata cache allocation. + int pos = this.metadata.position(); + int metaOffsetSize = ((this.metadata.get(pos) >> 6) & 0x3) + 1; + if (this.metadata.remaining() > 1) { + this.dictSize = VariantUtil.readUnsigned(this.metadata, pos + 1, metaOffsetSize); + } else { + this.dictSize = 0; + } + this.metadataCache = null; + } + + /** + * Package-private constructor that shares pre-parsed metadata state from a parent Variant. + */ + Variant(ByteBuffer value, ByteBuffer metadata, String[] metadataCache, int dictSize) { + this.value = value.asReadOnlyBuffer(); + this.metadata = metadata.asReadOnlyBuffer(); + this.metadataCache = metadataCache; + this.dictSize = dictSize; } public ByteBuffer getValueBuffer() { @@ -194,7 +234,7 @@ public Type getType() { * @throws IllegalArgumentException if `getType()` does not return `Type.OBJECT` */ public int numObjectElements() { - return VariantUtil.getObjectInfo(value).numElements; + return objectInfo().numElements; } /** @@ -206,22 +246,18 @@ public int numObjectElements() { * @throws IllegalArgumentException if `getType()` does not return `Type.OBJECT` */ public Variant getFieldByKey(String key) { - VariantUtil.ObjectInfo info = VariantUtil.getObjectInfo(value); - // Use linear search for a short list. Switch to binary search when the length reaches - // `BINARY_SEARCH_THRESHOLD`. + VariantUtil.ObjectInfo info = objectInfo(); + int idStart = value.position() + info.idStartOffset; + int offsetStart = value.position() + info.offsetStartOffset; + int dataStart = value.position() + info.dataStartOffset; + if (info.numElements < BINARY_SEARCH_THRESHOLD) { for (int i = 0; i < info.numElements; ++i) { - ObjectField field = getFieldAtIndex( - i, - value, - metadata, - info.idSize, - info.offsetSize, - value.position() + info.idStartOffset, - value.position() + info.offsetStartOffset, - value.position() + info.dataStartOffset); - if (field.key.equals(key)) { - return field.value; + int id = VariantUtil.readUnsigned(value, idStart + info.idSize * i, info.idSize); + String fieldKey = getMetadataKeyCached(id); + if (fieldKey.equals(key)) { + int offset = VariantUtil.readUnsigned(value, offsetStart + info.offsetSize * i, info.offsetSize); + return childVariant(VariantUtil.slice(value, dataStart + offset)); } } } else { @@ -232,22 +268,16 @@ public Variant getFieldByKey(String key) { // performance optimization, because it can properly handle the case where `low + high` // overflows int. int mid = (low + high) >>> 1; - ObjectField field = getFieldAtIndex( - mid, - value, - metadata, - info.idSize, - info.offsetSize, - value.position() + info.idStartOffset, - value.position() + info.offsetStartOffset, - value.position() + info.dataStartOffset); - int cmp = field.key.compareTo(key); + int midId = VariantUtil.readUnsigned(value, idStart + info.idSize * mid, info.idSize); + String midKey = getMetadataKeyCached(midId); + int cmp = midKey.compareTo(key); if (cmp < 0) { low = mid + 1; } else if (cmp > 0) { high = mid - 1; } else { - return field.value; + int offset = VariantUtil.readUnsigned(value, offsetStart + info.offsetSize * mid, info.offsetSize); + return childVariant(VariantUtil.slice(value, dataStart + offset)); } } } @@ -275,35 +305,14 @@ public ObjectField(String key, Variant value) { * @throws IllegalArgumentException if `getType()` does not return `Type.OBJECT` */ public ObjectField getFieldAtIndex(int idx) { - VariantUtil.ObjectInfo info = VariantUtil.getObjectInfo(value); - // Use linear search for a short list. Switch to binary search when the length reaches - // `BINARY_SEARCH_THRESHOLD`. - ObjectField field = getFieldAtIndex( - idx, - value, - metadata, - info.idSize, - info.offsetSize, - value.position() + info.idStartOffset, - value.position() + info.offsetStartOffset, - value.position() + info.dataStartOffset); - return field; - } - - static ObjectField getFieldAtIndex( - int index, - ByteBuffer value, - ByteBuffer metadata, - int idSize, - int offsetSize, - int idStart, - int offsetStart, - int dataStart) { - // idStart, offsetStart, and dataStart are absolute positions in the `value` buffer. - int id = VariantUtil.readUnsigned(value, idStart + idSize * index, idSize); - int offset = VariantUtil.readUnsigned(value, offsetStart + offsetSize * index, offsetSize); - String key = VariantUtil.getMetadataKey(metadata, id); - Variant v = new Variant(VariantUtil.slice(value, dataStart + offset), metadata); + VariantUtil.ObjectInfo info = objectInfo(); + int idStart = value.position() + info.idStartOffset; + int offsetStart = value.position() + info.offsetStartOffset; + int dataStart = value.position() + info.dataStartOffset; + int id = VariantUtil.readUnsigned(value, idStart + info.idSize * idx, info.idSize); + int offset = VariantUtil.readUnsigned(value, offsetStart + info.offsetSize * idx, info.offsetSize); + String key = getMetadataKeyCached(id); + Variant v = childVariant(VariantUtil.slice(value, dataStart + offset)); return new ObjectField(key, v); } @@ -312,7 +321,7 @@ static ObjectField getFieldAtIndex( * @throws IllegalArgumentException if `getType()` does not return `Type.ARRAY` */ public int numArrayElements() { - return VariantUtil.getArrayInfo(value).numElements; + return arrayInfo().numElements; } /** @@ -324,23 +333,64 @@ public int numArrayElements() { * @throws IllegalArgumentException if `getType()` does not return `Type.ARRAY` */ public Variant getElementAtIndex(int index) { - VariantUtil.ArrayInfo info = VariantUtil.getArrayInfo(value); + VariantUtil.ArrayInfo info = arrayInfo(); if (index < 0 || index >= info.numElements) { return null; } - return getElementAtIndex( - index, - value, - metadata, - info.offsetSize, - value.position() + info.offsetStartOffset, - value.position() + info.dataStartOffset); + int offsetStart = value.position() + info.offsetStartOffset; + int dataStart = value.position() + info.dataStartOffset; + int offset = VariantUtil.readUnsigned(value, offsetStart + info.offsetSize * index, info.offsetSize); + return childVariant(VariantUtil.slice(value, dataStart + offset)); + } + + /** + * Creates a child Variant that shares this instance's metadata cache. + */ + private Variant childVariant(ByteBuffer childValue) { + return new Variant(childValue, metadata, metadataCache, dictSize); } - private static Variant getElementAtIndex( - int index, ByteBuffer value, ByteBuffer metadata, int offsetSize, int offsetStart, int dataStart) { - // offsetStart and dataStart are absolute positions in the `value` buffer. - int offset = VariantUtil.readUnsigned(value, offsetStart + offsetSize * index, offsetSize); - return new Variant(VariantUtil.slice(value, dataStart + offset), metadata); + /** + * Returns the metadata dictionary string for the given ID, caching the result. + */ + String getMetadataKeyCached(int id) { + // Fall back to uncached lookup for out-of-range IDs + if (id < 0 || id >= dictSize) { + return VariantUtil.getMetadataKey(metadata, id); + } + // Demand-create shared dictionary cache + String[] cache = metadataCache; + if (cache == null) { + cache = new String[dictSize]; + metadataCache = cache; + } + if (cache[id] == null) { + cache[id] = VariantUtil.getMetadataKey(metadata, id); + } + return cache[id]; + } + + /** + * Returns the cached object header, parsing it on first access. + */ + private VariantUtil.ObjectInfo objectInfo() { + VariantUtil.ObjectInfo info = cachedObjectInfo; + if (info == null) { + info = VariantUtil.getObjectInfo(value); + cachedObjectInfo = info; + } + return info; + } + + /** + * Returns the cached array header, parsing it on first access. + */ + private VariantUtil.ArrayInfo arrayInfo() { + VariantUtil.ArrayInfo info = cachedArrayInfo; + if (info == null) { + info = VariantUtil.getArrayInfo(value); + cachedArrayInfo = info; + } + return info; } } diff --git a/parquet-variant/src/main/java/org/apache/parquet/variant/VariantBuilder.java b/parquet-variant/src/main/java/org/apache/parquet/variant/VariantBuilder.java index 4b8eb6d8c3..213f69729f 100644 --- a/parquet-variant/src/main/java/org/apache/parquet/variant/VariantBuilder.java +++ b/parquet-variant/src/main/java/org/apache/parquet/variant/VariantBuilder.java @@ -24,6 +24,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Set; +import org.apache.parquet.io.api.Binary; /** * Builder for creating Variant value and metadata. @@ -109,7 +110,14 @@ public void appendEncodedValue(ByteBuffer value) { */ public void appendString(String str) { onAppend(); - byte[] data = str.getBytes(StandardCharsets.UTF_8); + writeUTF8bytes(str.getBytes(StandardCharsets.UTF_8)); + } + + /** + * Write bytes as a UTF8 string. + * @param data data to write; this is not modified. + */ + private void writeUTF8bytes(final byte[] data) { boolean longStr = data.length > VariantUtil.MAX_SHORT_STR_SIZE; checkCapacity((longStr ? 1 + VariantUtil.U32_SIZE : 1) + data.length); if (longStr) { @@ -125,6 +133,16 @@ public void appendString(String str) { writePos += data.length; } + /** + * Given a Binary, append it to the variant as a string. + * Avoids intermediate String creation when unmarshalling from shredded string columns. + * @param binary source data. + */ + void appendAsString(Binary binary) { + onAppend(); + writeUTF8bytes(binary.getBytesUnsafe()); + } + /** * Appends a null value to the Variant builder. */ diff --git a/parquet-variant/src/main/java/org/apache/parquet/variant/VariantConverters.java b/parquet-variant/src/main/java/org/apache/parquet/variant/VariantConverters.java index 6d0986c2b2..bda088c55d 100644 --- a/parquet-variant/src/main/java/org/apache/parquet/variant/VariantConverters.java +++ b/parquet-variant/src/main/java/org/apache/parquet/variant/VariantConverters.java @@ -233,6 +233,7 @@ static class PartiallyShreddedFieldsConverter extends GroupConverter { PartiallyShreddedFieldsConverter(GroupType fieldsType, ParentConverter parent) { this.converters = new Converter[fieldsType.getFieldCount()]; this.parent = parent; + ParentConverter newParent = converter -> converter.accept(objectBuilder); for (int index = 0; index < fieldsType.getFieldCount(); index += 1) { Type field = fieldsType.getType(index); @@ -240,7 +241,6 @@ static class PartiallyShreddedFieldsConverter extends GroupConverter { String name = field.getName(); shreddedFieldNames.add(name); - ParentConverter newParent = converter -> converter.accept(objectBuilder); converters[index] = new FieldValueConverter(name, field.asGroupType(), newParent); } } @@ -501,7 +501,7 @@ static class VariantStringConverter extends ShreddedScalarConverter { @Override public void addBinary(Binary value) { - parent.build(builder -> builder.appendString(value.toStringUsingUTF8())); + parent.build(builder -> builder.appendAsString(value)); } } From fcbef756486ab9767dbde41593f73ef82c9963e0 Mon Sep 17 00:00:00 2001 From: Neelesh Salian Date: Sun, 19 Apr 2026 17:06:43 -0700 Subject: [PATCH 2/3] Remove unnecessary volatile fields and fix PR comments --- .../src/main/java/org/apache/parquet/variant/Variant.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/parquet-variant/src/main/java/org/apache/parquet/variant/Variant.java b/parquet-variant/src/main/java/org/apache/parquet/variant/Variant.java index bcdf9711a1..a63453be69 100644 --- a/parquet-variant/src/main/java/org/apache/parquet/variant/Variant.java +++ b/parquet-variant/src/main/java/org/apache/parquet/variant/Variant.java @@ -37,7 +37,7 @@ public final class Variant { final ByteBuffer metadata; /** - * Pre-computed metadata dictionary size + * Pre-computed metadata dictionary size. */ private final int dictSize; @@ -49,12 +49,12 @@ public final class Variant { /** * Lazy cache for the parsed object header. */ - private volatile VariantUtil.ObjectInfo cachedObjectInfo; + private VariantUtil.ObjectInfo cachedObjectInfo; /** * Lazy cache for the parsed array header. */ - private volatile VariantUtil.ArrayInfo cachedArrayInfo; + private VariantUtil.ArrayInfo cachedArrayInfo; /** * The threshold to switch from linear search to binary search when looking up a field by key in @@ -363,6 +363,7 @@ String getMetadataKeyCached(int id) { if (cache == null) { cache = new String[dictSize]; metadataCache = cache; + cache = metadataCache; } if (cache[id] == null) { cache[id] = VariantUtil.getMetadataKey(metadata, id); From c6cc9ed03452835841967f33ac796627a02c14c5 Mon Sep 17 00:00:00 2001 From: Neelesh Salian Date: Mon, 27 Apr 2026 10:33:53 -0700 Subject: [PATCH 3/3] Add readUnsignedLittleEndian for bulk ByteBuffer reads and concurrency javadoc --- .../org/apache/parquet/variant/Variant.java | 50 +++++++++++-------- .../apache/parquet/variant/VariantUtil.java | 25 ++++++++++ 2 files changed, 54 insertions(+), 21 deletions(-) diff --git a/parquet-variant/src/main/java/org/apache/parquet/variant/Variant.java b/parquet-variant/src/main/java/org/apache/parquet/variant/Variant.java index a63453be69..272317ff76 100644 --- a/parquet-variant/src/main/java/org/apache/parquet/variant/Variant.java +++ b/parquet-variant/src/main/java/org/apache/parquet/variant/Variant.java @@ -20,10 +20,15 @@ import java.math.BigDecimal; import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.util.UUID; /** * This Variant class holds the Variant-encoded value and metadata binary values. + * + *

Concurrency: the byte buffers are read-only and all lazy caches are idempotent, + * so concurrent reads are safe - the worst outcome is a redundant decode. The metadata + * dictionary cache is {@code volatile} for safe publication to child Variants. */ public final class Variant { /** @@ -42,7 +47,7 @@ public final class Variant { private final int dictSize; /** - * Lazy cache for metadata dictionary strings. + * Lazy cache for metadata dictionary strings, shared with child Variants. */ private volatile String[] metadataCache; @@ -76,10 +81,8 @@ public Variant(byte[] value, int valuePos, int valueLength, byte[] metadata, int } public Variant(ByteBuffer value, ByteBuffer metadata) { - // The buffers are read a single-byte at a time, so the endianness of the input buffers - // is not important. - this.value = value.asReadOnlyBuffer(); - this.metadata = metadata.asReadOnlyBuffer(); + this.value = value.asReadOnlyBuffer().order(ByteOrder.LITTLE_ENDIAN); + this.metadata = metadata.asReadOnlyBuffer().order(ByteOrder.LITTLE_ENDIAN); // There is currently only one allowed version. if ((metadata.get(metadata.position()) & VariantUtil.VERSION_MASK) != VariantUtil.VERSION) { @@ -92,7 +95,7 @@ public Variant(ByteBuffer value, ByteBuffer metadata) { int pos = this.metadata.position(); int metaOffsetSize = ((this.metadata.get(pos) >> 6) & 0x3) + 1; if (this.metadata.remaining() > 1) { - this.dictSize = VariantUtil.readUnsigned(this.metadata, pos + 1, metaOffsetSize); + this.dictSize = VariantUtil.readUnsignedLittleEndian(this.metadata, pos + 1, metaOffsetSize); } else { this.dictSize = 0; } @@ -103,8 +106,8 @@ public Variant(ByteBuffer value, ByteBuffer metadata) { * Package-private constructor that shares pre-parsed metadata state from a parent Variant. */ Variant(ByteBuffer value, ByteBuffer metadata, String[] metadataCache, int dictSize) { - this.value = value.asReadOnlyBuffer(); - this.metadata = metadata.asReadOnlyBuffer(); + this.value = value.asReadOnlyBuffer().order(ByteOrder.LITTLE_ENDIAN); + this.metadata = metadata.asReadOnlyBuffer().order(ByteOrder.LITTLE_ENDIAN); this.metadataCache = metadataCache; this.dictSize = dictSize; } @@ -253,10 +256,11 @@ public Variant getFieldByKey(String key) { if (info.numElements < BINARY_SEARCH_THRESHOLD) { for (int i = 0; i < info.numElements; ++i) { - int id = VariantUtil.readUnsigned(value, idStart + info.idSize * i, info.idSize); + int id = VariantUtil.readUnsignedLittleEndian(value, idStart + info.idSize * i, info.idSize); String fieldKey = getMetadataKeyCached(id); if (fieldKey.equals(key)) { - int offset = VariantUtil.readUnsigned(value, offsetStart + info.offsetSize * i, info.offsetSize); + int offset = VariantUtil.readUnsignedLittleEndian( + value, offsetStart + info.offsetSize * i, info.offsetSize); return childVariant(VariantUtil.slice(value, dataStart + offset)); } } @@ -268,7 +272,7 @@ public Variant getFieldByKey(String key) { // performance optimization, because it can properly handle the case where `low + high` // overflows int. int mid = (low + high) >>> 1; - int midId = VariantUtil.readUnsigned(value, idStart + info.idSize * mid, info.idSize); + int midId = VariantUtil.readUnsignedLittleEndian(value, idStart + info.idSize * mid, info.idSize); String midKey = getMetadataKeyCached(midId); int cmp = midKey.compareTo(key); if (cmp < 0) { @@ -276,7 +280,8 @@ public Variant getFieldByKey(String key) { } else if (cmp > 0) { high = mid - 1; } else { - int offset = VariantUtil.readUnsigned(value, offsetStart + info.offsetSize * mid, info.offsetSize); + int offset = VariantUtil.readUnsignedLittleEndian( + value, offsetStart + info.offsetSize * mid, info.offsetSize); return childVariant(VariantUtil.slice(value, dataStart + offset)); } } @@ -309,8 +314,8 @@ public ObjectField getFieldAtIndex(int idx) { int idStart = value.position() + info.idStartOffset; int offsetStart = value.position() + info.offsetStartOffset; int dataStart = value.position() + info.dataStartOffset; - int id = VariantUtil.readUnsigned(value, idStart + info.idSize * idx, info.idSize); - int offset = VariantUtil.readUnsigned(value, offsetStart + info.offsetSize * idx, info.offsetSize); + int id = VariantUtil.readUnsignedLittleEndian(value, idStart + info.idSize * idx, info.idSize); + int offset = VariantUtil.readUnsignedLittleEndian(value, offsetStart + info.offsetSize * idx, info.offsetSize); String key = getMetadataKeyCached(id); Variant v = childVariant(VariantUtil.slice(value, dataStart + offset)); return new ObjectField(key, v); @@ -339,7 +344,8 @@ public Variant getElementAtIndex(int index) { } int offsetStart = value.position() + info.offsetStartOffset; int dataStart = value.position() + info.dataStartOffset; - int offset = VariantUtil.readUnsigned(value, offsetStart + info.offsetSize * index, info.offsetSize); + int offset = + VariantUtil.readUnsignedLittleEndian(value, offsetStart + info.offsetSize * index, info.offsetSize); return childVariant(VariantUtil.slice(value, dataStart + offset)); } @@ -352,23 +358,25 @@ private Variant childVariant(ByteBuffer childValue) { /** * Returns the metadata dictionary string for the given ID, caching the result. + * All state is captured in locals so the calling thread retains needed values + * for the duration of the call regardless of concurrent access. */ String getMetadataKeyCached(int id) { - // Fall back to uncached lookup for out-of-range IDs if (id < 0 || id >= dictSize) { return VariantUtil.getMetadataKey(metadata, id); } - // Demand-create shared dictionary cache + // Demand-create shared dictionary cache. String[] cache = metadataCache; if (cache == null) { cache = new String[dictSize]; metadataCache = cache; - cache = metadataCache; } - if (cache[id] == null) { - cache[id] = VariantUtil.getMetadataKey(metadata, id); + String key = cache[id]; + if (key == null) { + key = VariantUtil.getMetadataKey(metadata, id); + cache[id] = key; } - return cache[id]; + return key; } /** diff --git a/parquet-variant/src/main/java/org/apache/parquet/variant/VariantUtil.java b/parquet-variant/src/main/java/org/apache/parquet/variant/VariantUtil.java index 4744f0c28d..0f39361506 100644 --- a/parquet-variant/src/main/java/org/apache/parquet/variant/VariantUtil.java +++ b/parquet-variant/src/main/java/org/apache/parquet/variant/VariantUtil.java @@ -299,6 +299,31 @@ static int readUnsigned(ByteBuffer bytes, int pos, int numBytes) { return result; } + /** + * Fast little-endian unsigned read using bulk ByteBuffer operations. + * Requires the buffer to have {@link java.nio.ByteOrder#LITTLE_ENDIAN} byte order. + * Adapted from Apache Iceberg's VariantUtil.readLittleEndianUnsigned. + */ + static int readUnsignedLittleEndian(ByteBuffer buffer, int pos, int numBytes) { + switch (numBytes) { + case 1: + return buffer.get(pos) & U8_MAX; + case 2: + return buffer.getShort(pos) & U16_MAX; + case 3: + return (buffer.getShort(pos) & U16_MAX) | ((buffer.get(pos + 2) & U8_MAX) << 16); + case 4: + int v = buffer.getInt(pos); + if (v < 0) { + throw new IllegalArgumentException( + String.format("Failed to read unsigned int. numBytes: %d", numBytes)); + } + return v; + default: + throw new IllegalArgumentException(String.format("Invalid numBytes: %d", numBytes)); + } + } + /** * Returns the value type of Variant value `value[pos...]`. It is only legal to call `get*` if * `getType` returns the corresponding type. For example, it is only legal to call