diff --git a/parquet-variant/src/main/java/org/apache/parquet/variant/Variant.java b/parquet-variant/src/main/java/org/apache/parquet/variant/Variant.java index 0ccb8359ba..272317ff76 100644 --- a/parquet-variant/src/main/java/org/apache/parquet/variant/Variant.java +++ b/parquet-variant/src/main/java/org/apache/parquet/variant/Variant.java @@ -20,10 +20,15 @@ import java.math.BigDecimal; import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.util.UUID; /** * This Variant class holds the Variant-encoded value and metadata binary values. + * + *
Concurrency: the byte buffers are read-only and all lazy caches are idempotent,
+ * so concurrent reads are safe - the worst outcome is a redundant decode. The metadata
+ * dictionary cache is {@code volatile} for safe publication to child Variants.
*/
public final class Variant {
/**
@@ -36,6 +41,26 @@ public final class Variant {
*/
final ByteBuffer metadata;
+ /**
+ * Pre-computed metadata dictionary size.
+ */
+ private final int dictSize;
+
+ /**
+ * Lazy cache for metadata dictionary strings, shared with child Variants.
+ */
+ private volatile String[] metadataCache;
+
+ /**
+ * Lazy cache for the parsed object header.
+ */
+ private VariantUtil.ObjectInfo cachedObjectInfo;
+
+ /**
+ * Lazy cache for the parsed array header.
+ */
+ private VariantUtil.ArrayInfo cachedArrayInfo;
+
/**
* The threshold to switch from linear search to binary search when looking up a field by key in
* an object. This is a performance optimization to avoid the overhead of binary search for a
@@ -56,10 +81,8 @@ public Variant(byte[] value, int valuePos, int valueLength, byte[] metadata, int
}
public Variant(ByteBuffer value, ByteBuffer metadata) {
- // The buffers are read a single-byte at a time, so the endianness of the input buffers
- // is not important.
- this.value = value.asReadOnlyBuffer();
- this.metadata = metadata.asReadOnlyBuffer();
+ this.value = value.asReadOnlyBuffer().order(ByteOrder.LITTLE_ENDIAN);
+ this.metadata = metadata.asReadOnlyBuffer().order(ByteOrder.LITTLE_ENDIAN);
// There is currently only one allowed version.
if ((metadata.get(metadata.position()) & VariantUtil.VERSION_MASK) != VariantUtil.VERSION) {
@@ -67,6 +90,26 @@ public Variant(ByteBuffer value, ByteBuffer metadata) {
"Unsupported variant metadata version: %d",
metadata.get(metadata.position()) & VariantUtil.VERSION_MASK));
}
+
+ // Pre-compute dictionary size for lazy metadata cache allocation.
+ int pos = this.metadata.position();
+ int metaOffsetSize = ((this.metadata.get(pos) >> 6) & 0x3) + 1;
+ if (this.metadata.remaining() > 1) {
+ this.dictSize = VariantUtil.readUnsignedLittleEndian(this.metadata, pos + 1, metaOffsetSize);
+ } else {
+ this.dictSize = 0;
+ }
+ this.metadataCache = null;
+ }
+
+ /**
+ * Package-private constructor that shares pre-parsed metadata state from a parent Variant.
+ */
+ Variant(ByteBuffer value, ByteBuffer metadata, String[] metadataCache, int dictSize) {
+ this.value = value.asReadOnlyBuffer().order(ByteOrder.LITTLE_ENDIAN);
+ this.metadata = metadata.asReadOnlyBuffer().order(ByteOrder.LITTLE_ENDIAN);
+ this.metadataCache = metadataCache;
+ this.dictSize = dictSize;
}
public ByteBuffer getValueBuffer() {
@@ -194,7 +237,7 @@ public Type getType() {
* @throws IllegalArgumentException if `getType()` does not return `Type.OBJECT`
*/
public int numObjectElements() {
- return VariantUtil.getObjectInfo(value).numElements;
+ return objectInfo().numElements;
}
/**
@@ -206,22 +249,19 @@ public int numObjectElements() {
* @throws IllegalArgumentException if `getType()` does not return `Type.OBJECT`
*/
public Variant getFieldByKey(String key) {
- VariantUtil.ObjectInfo info = VariantUtil.getObjectInfo(value);
- // Use linear search for a short list. Switch to binary search when the length reaches
- // `BINARY_SEARCH_THRESHOLD`.
+ VariantUtil.ObjectInfo info = objectInfo();
+ int idStart = value.position() + info.idStartOffset;
+ int offsetStart = value.position() + info.offsetStartOffset;
+ int dataStart = value.position() + info.dataStartOffset;
+
if (info.numElements < BINARY_SEARCH_THRESHOLD) {
for (int i = 0; i < info.numElements; ++i) {
- ObjectField field = getFieldAtIndex(
- i,
- value,
- metadata,
- info.idSize,
- info.offsetSize,
- value.position() + info.idStartOffset,
- value.position() + info.offsetStartOffset,
- value.position() + info.dataStartOffset);
- if (field.key.equals(key)) {
- return field.value;
+ int id = VariantUtil.readUnsignedLittleEndian(value, idStart + info.idSize * i, info.idSize);
+ String fieldKey = getMetadataKeyCached(id);
+ if (fieldKey.equals(key)) {
+ int offset = VariantUtil.readUnsignedLittleEndian(
+ value, offsetStart + info.offsetSize * i, info.offsetSize);
+ return childVariant(VariantUtil.slice(value, dataStart + offset));
}
}
} else {
@@ -232,22 +272,17 @@ public Variant getFieldByKey(String key) {
// performance optimization, because it can properly handle the case where `low + high`
// overflows int.
int mid = (low + high) >>> 1;
- ObjectField field = getFieldAtIndex(
- mid,
- value,
- metadata,
- info.idSize,
- info.offsetSize,
- value.position() + info.idStartOffset,
- value.position() + info.offsetStartOffset,
- value.position() + info.dataStartOffset);
- int cmp = field.key.compareTo(key);
+ int midId = VariantUtil.readUnsignedLittleEndian(value, idStart + info.idSize * mid, info.idSize);
+ String midKey = getMetadataKeyCached(midId);
+ int cmp = midKey.compareTo(key);
if (cmp < 0) {
low = mid + 1;
} else if (cmp > 0) {
high = mid - 1;
} else {
- return field.value;
+ int offset = VariantUtil.readUnsignedLittleEndian(
+ value, offsetStart + info.offsetSize * mid, info.offsetSize);
+ return childVariant(VariantUtil.slice(value, dataStart + offset));
}
}
}
@@ -275,35 +310,14 @@ public ObjectField(String key, Variant value) {
* @throws IllegalArgumentException if `getType()` does not return `Type.OBJECT`
*/
public ObjectField getFieldAtIndex(int idx) {
- VariantUtil.ObjectInfo info = VariantUtil.getObjectInfo(value);
- // Use linear search for a short list. Switch to binary search when the length reaches
- // `BINARY_SEARCH_THRESHOLD`.
- ObjectField field = getFieldAtIndex(
- idx,
- value,
- metadata,
- info.idSize,
- info.offsetSize,
- value.position() + info.idStartOffset,
- value.position() + info.offsetStartOffset,
- value.position() + info.dataStartOffset);
- return field;
- }
-
- static ObjectField getFieldAtIndex(
- int index,
- ByteBuffer value,
- ByteBuffer metadata,
- int idSize,
- int offsetSize,
- int idStart,
- int offsetStart,
- int dataStart) {
- // idStart, offsetStart, and dataStart are absolute positions in the `value` buffer.
- int id = VariantUtil.readUnsigned(value, idStart + idSize * index, idSize);
- int offset = VariantUtil.readUnsigned(value, offsetStart + offsetSize * index, offsetSize);
- String key = VariantUtil.getMetadataKey(metadata, id);
- Variant v = new Variant(VariantUtil.slice(value, dataStart + offset), metadata);
+ VariantUtil.ObjectInfo info = objectInfo();
+ int idStart = value.position() + info.idStartOffset;
+ int offsetStart = value.position() + info.offsetStartOffset;
+ int dataStart = value.position() + info.dataStartOffset;
+ int id = VariantUtil.readUnsignedLittleEndian(value, idStart + info.idSize * idx, info.idSize);
+ int offset = VariantUtil.readUnsignedLittleEndian(value, offsetStart + info.offsetSize * idx, info.offsetSize);
+ String key = getMetadataKeyCached(id);
+ Variant v = childVariant(VariantUtil.slice(value, dataStart + offset));
return new ObjectField(key, v);
}
@@ -312,7 +326,7 @@ static ObjectField getFieldAtIndex(
* @throws IllegalArgumentException if `getType()` does not return `Type.ARRAY`
*/
public int numArrayElements() {
- return VariantUtil.getArrayInfo(value).numElements;
+ return arrayInfo().numElements;
}
/**
@@ -324,23 +338,68 @@ public int numArrayElements() {
* @throws IllegalArgumentException if `getType()` does not return `Type.ARRAY`
*/
public Variant getElementAtIndex(int index) {
- VariantUtil.ArrayInfo info = VariantUtil.getArrayInfo(value);
+ VariantUtil.ArrayInfo info = arrayInfo();
if (index < 0 || index >= info.numElements) {
return null;
}
- return getElementAtIndex(
- index,
- value,
- metadata,
- info.offsetSize,
- value.position() + info.offsetStartOffset,
- value.position() + info.dataStartOffset);
+ int offsetStart = value.position() + info.offsetStartOffset;
+ int dataStart = value.position() + info.dataStartOffset;
+ int offset =
+ VariantUtil.readUnsignedLittleEndian(value, offsetStart + info.offsetSize * index, info.offsetSize);
+ return childVariant(VariantUtil.slice(value, dataStart + offset));
+ }
+
+ /**
+ * Creates a child Variant that shares this instance's metadata cache.
+ */
+ private Variant childVariant(ByteBuffer childValue) {
+ return new Variant(childValue, metadata, metadataCache, dictSize);
}
- private static Variant getElementAtIndex(
- int index, ByteBuffer value, ByteBuffer metadata, int offsetSize, int offsetStart, int dataStart) {
- // offsetStart and dataStart are absolute positions in the `value` buffer.
- int offset = VariantUtil.readUnsigned(value, offsetStart + offsetSize * index, offsetSize);
- return new Variant(VariantUtil.slice(value, dataStart + offset), metadata);
+ /**
+ * Returns the metadata dictionary string for the given ID, caching the result.
+ * All state is captured in locals so the calling thread retains needed values
+ * for the duration of the call regardless of concurrent access.
+ */
+ String getMetadataKeyCached(int id) {
+ if (id < 0 || id >= dictSize) {
+ return VariantUtil.getMetadataKey(metadata, id);
+ }
+ // Demand-create shared dictionary cache.
+ String[] cache = metadataCache;
+ if (cache == null) {
+ cache = new String[dictSize];
+ metadataCache = cache;
+ }
+ String key = cache[id];
+ if (key == null) {
+ key = VariantUtil.getMetadataKey(metadata, id);
+ cache[id] = key;
+ }
+ return key;
+ }
+
+ /**
+ * Returns the cached object header, parsing it on first access.
+ */
+ private VariantUtil.ObjectInfo objectInfo() {
+ VariantUtil.ObjectInfo info = cachedObjectInfo;
+ if (info == null) {
+ info = VariantUtil.getObjectInfo(value);
+ cachedObjectInfo = info;
+ }
+ return info;
+ }
+
+ /**
+ * Returns the cached array header, parsing it on first access.
+ */
+ private VariantUtil.ArrayInfo arrayInfo() {
+ VariantUtil.ArrayInfo info = cachedArrayInfo;
+ if (info == null) {
+ info = VariantUtil.getArrayInfo(value);
+ cachedArrayInfo = info;
+ }
+ return info;
}
}
diff --git a/parquet-variant/src/main/java/org/apache/parquet/variant/VariantBuilder.java b/parquet-variant/src/main/java/org/apache/parquet/variant/VariantBuilder.java
index 4b8eb6d8c3..213f69729f 100644
--- a/parquet-variant/src/main/java/org/apache/parquet/variant/VariantBuilder.java
+++ b/parquet-variant/src/main/java/org/apache/parquet/variant/VariantBuilder.java
@@ -24,6 +24,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.Set;
+import org.apache.parquet.io.api.Binary;
/**
* Builder for creating Variant value and metadata.
@@ -109,7 +110,14 @@ public void appendEncodedValue(ByteBuffer value) {
*/
public void appendString(String str) {
onAppend();
- byte[] data = str.getBytes(StandardCharsets.UTF_8);
+ writeUTF8bytes(str.getBytes(StandardCharsets.UTF_8));
+ }
+
+ /**
+ * Write bytes as a UTF8 string.
+ * @param data data to write; this is not modified.
+ */
+ private void writeUTF8bytes(final byte[] data) {
boolean longStr = data.length > VariantUtil.MAX_SHORT_STR_SIZE;
checkCapacity((longStr ? 1 + VariantUtil.U32_SIZE : 1) + data.length);
if (longStr) {
@@ -125,6 +133,16 @@ public void appendString(String str) {
writePos += data.length;
}
+ /**
+ * Given a Binary, append it to the variant as a string.
+ * Avoids intermediate String creation when unmarshalling from shredded string columns.
+ * @param binary source data.
+ */
+ void appendAsString(Binary binary) {
+ onAppend();
+ writeUTF8bytes(binary.getBytesUnsafe());
+ }
+
/**
* Appends a null value to the Variant builder.
*/
diff --git a/parquet-variant/src/main/java/org/apache/parquet/variant/VariantConverters.java b/parquet-variant/src/main/java/org/apache/parquet/variant/VariantConverters.java
index 6d0986c2b2..bda088c55d 100644
--- a/parquet-variant/src/main/java/org/apache/parquet/variant/VariantConverters.java
+++ b/parquet-variant/src/main/java/org/apache/parquet/variant/VariantConverters.java
@@ -233,6 +233,7 @@ static class PartiallyShreddedFieldsConverter extends GroupConverter {
PartiallyShreddedFieldsConverter(GroupType fieldsType, ParentConverter