Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions parquet-variant/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,17 @@
<version>${slf4j.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ public final class Variant {
* Lazy cache for the parsed array header.
*/
private VariantUtil.ArrayInfo cachedArrayInfo;
/** Nesting depth of this Variant relative to the top-level value (0 = top-level). */
private final int depth;

/**
* The threshold to switch from linear search to binary search when looking up a field by key in
Expand All @@ -84,7 +86,7 @@ public Variant(byte[] value, int valuePos, int valueLength, byte[] metadata, int
public Variant(ByteBuffer value, ByteBuffer metadata) {
this.value = value.asReadOnlyBuffer().order(ByteOrder.LITTLE_ENDIAN);
this.metadata = metadata.asReadOnlyBuffer().order(ByteOrder.LITTLE_ENDIAN);

this.depth = 0;
// There is currently only one allowed version.
if ((metadata.get(metadata.position()) & VariantUtil.VERSION_MASK) != VariantUtil.VERSION) {
throw new UnsupportedOperationException(String.format(
Expand All @@ -108,16 +110,23 @@ public Variant(ByteBuffer value, ByteBuffer metadata) {
this.dictSize = 0;
}
this.metadataCache = null;
VariantUtil.validateValueShallow(this.value, dictSize);
}

/**
* Package-private constructor that shares pre-parsed metadata state from a parent Variant.
*/
Variant(ByteBuffer value, ByteBuffer metadata, String[] metadataCache, int dictSize) {
Variant(ByteBuffer value, ByteBuffer metadata, String[] metadataCache, int dictSize, int depth) {
Preconditions.checkArgument(
depth <= VariantUtil.MAX_VARIANT_DEPTH,
"variant nesting depth exceeds maximum %s",
VariantUtil.MAX_VARIANT_DEPTH);
this.value = value.asReadOnlyBuffer().order(ByteOrder.LITTLE_ENDIAN);
this.metadata = metadata.asReadOnlyBuffer().order(ByteOrder.LITTLE_ENDIAN);
this.metadataCache = metadataCache;
this.dictSize = dictSize;
this.depth = depth;
VariantUtil.validateValueShallow(this.value, dictSize);
}

public ByteBuffer getValueBuffer() {
Expand Down Expand Up @@ -361,7 +370,7 @@ public Variant getElementAtIndex(int index) {
* Creates a child Variant that shares this instance's metadata cache.
*/
private Variant childVariant(ByteBuffer childValue) {
return new Variant(childValue, metadata, metadataCache, dictSize);
return new Variant(childValue, metadata, metadataCache, dictSize, depth + 1);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.parquet.variant;

import static org.apache.parquet.variant.VariantUtil.MAX_VARIANT_DEPTH;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.JsonParser;
Expand All @@ -37,7 +39,7 @@ public final class VariantJsonParser {

private static final JsonFactory JSON_FACTORY = JsonFactory.builder()
.streamReadConstraints(StreamReadConstraints.builder()
.maxNestingDepth(500)
.maxNestingDepth(MAX_VARIANT_DEPTH)
.maxStringLength(10_000_000)
.maxDocumentLength(50_000_000L)
.build())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,12 @@ class VariantUtil {
// The size (in bytes) of a UUID.
static final int UUID_SIZE = 16;

/**
* Maximum permitted nesting depth of a Variant value.
* same limit as in VariantJsonParser.
*/
static final int MAX_VARIANT_DEPTH = 500;

// header bytes
static final byte HEADER_NULL = primitiveHeader(NULL);
static final byte HEADER_LONG_STRING = primitiveHeader(LONG_STR);
Expand Down Expand Up @@ -874,6 +880,158 @@ static HashMap<String, Integer> getMetadataMap(ByteBuffer metadata) {
return result;
}

/**
* Bounds-checks a single Variant value node against its buffer slot. Performs no recursion
* into nested children: child nodes are checked on demand when callers descend into them.
*
* <p>Cost: O(1) for primitives and short strings, O(numElements) for objects and arrays.
* Validation of nested structures is deferred so that opening a large well-formed Variant
* is not penalized by sub-trees the caller never inspects.
*
* @param valueBuffer the variant value buffer (position/limit define the extent of this node's slot)
* @param dictSize the metadata dictionary size, used to bound object field ids
* @throws IllegalArgumentException if the value header or container table does not fit within
* the buffer slot, or if any object field id is out of range
*/
static void validateValueShallow(final ByteBuffer valueBuffer, final int dictSize) {
int pos = valueBuffer.position();
Preconditions.checkArgument(pos >= 0 && pos < valueBuffer.limit(), "variant value is empty");
long slot = (long) valueBuffer.limit() - pos;
int header = valueBuffer.get(pos) & 0xFF;
int basicType = header & BASIC_TYPE_MASK;
int typeInfo = (header >> BASIC_TYPE_BITS) & PRIMITIVE_TYPE_MASK;
switch (basicType) {
case SHORT_STR:
Preconditions.checkArgument(1L + typeInfo <= slot, "variant short string extends past buffer");
return;
case OBJECT:
validateContainerShallow(valueBuffer, typeInfo, pos, slot, dictSize, true);
return;
case ARRAY:
validateContainerShallow(valueBuffer, typeInfo, pos, slot, dictSize, false);
return;
default:
validatePrimitiveShallow(valueBuffer, typeInfo, pos, slot);
}
}

/**
* Shallow validation of a container.
*
* @param valueBuffer buffer with the variant data
* @param typeInfo type information from the metadata
* @param pos buffer read position
* @param length length of slot for this primitive.
* @param dictSize dictionary size
* @param isObject is this an object?
*/
private static void validateContainerShallow(
final ByteBuffer valueBuffer,
final int typeInfo,
final int pos,
final long length,
final int dictSize,
final boolean isObject) {
boolean largeSize;
int idSize;
if (isObject) {
largeSize = ((typeInfo >> 4) & 0x1) != 0;
idSize = ((typeInfo >> 2) & 0x3) + 1;
} else {
largeSize = ((typeInfo >> 2) & 0x1) != 0;
idSize = 0;
}
int offsetSize = (typeInfo & 0x3) + 1;
int sizeBytes = largeSize ? U32_SIZE : 1;
Preconditions.checkArgument(1L + sizeBytes <= length, "variant container header truncated");
int numElements = readUnsigned(valueBuffer, pos + 1, sizeBytes);
long idStart = 1L + sizeBytes;
long idBytes = isObject ? (long) numElements * idSize : 0L;
long offsetStart = idStart + idBytes;
long offsetBytes = (long) (numElements + 1) * offsetSize;
long dataStart = offsetStart + offsetBytes;
Preconditions.checkArgument(
dataStart <= length, "variant container offset table extends past buffer: numElements=%s", numElements);
long dataLen = length - dataStart;
if (isObject) {
for (int i = 0; i < numElements; i++) {
int id = readUnsigned(valueBuffer, pos + (int) idStart + i * idSize, idSize);
Preconditions.checkArgument(
id < dictSize, "variant object key id %s out of range (dictSize=%s)", id, dictSize);
}
}
// Each child offset must lie within the data region. Children may overlap or leave gaps;
// the trailing terminator offset is range-checked for the same reason.
for (int i = 0; i <= numElements; i++) {
// O(elements)
int off = readUnsigned(valueBuffer, pos + (int) offsetStart + i * offsetSize, offsetSize);
Preconditions.checkArgument(
off <= dataLen, "variant child offset out of range: %s (data length %s)", off, dataLen);
}
}

/**
* Validate the primitive type. The buffer must have enough capacity
* for the given type (and of course, the type must be recognized).
*
* @param valueBuffer buffer with the variant data
* @param typeInfo type information from the metadata
* @param pos buffer read position
* @param length length of slot for this primitive.
*/
private static void validatePrimitiveShallow(
final ByteBuffer valueBuffer, final int typeInfo, final int pos, final long length) {
long size;
switch (typeInfo) {
case NULL:
case TRUE:
case FALSE:
size = 1;
break;
case INT8:
size = 2;
break;
case INT16:
size = 3;
break;
case INT32:
case DATE:
case FLOAT:
size = 5;
break;
case INT64:
case DOUBLE:
case TIMESTAMP_TZ:
case TIMESTAMP_NTZ:
case TIME:
case TIMESTAMP_NANOS_TZ:
case TIMESTAMP_NANOS_NTZ:
size = 9;
break;
case DECIMAL4:
size = 6;
break;
case DECIMAL8:
size = 10;
break;
case DECIMAL16:
size = 18;
break;
case BINARY:
case LONG_STR: {
Preconditions.checkArgument(1L + U32_SIZE <= length, "variant string/binary length field truncated");
size = 1L + U32_SIZE + readUnsigned(valueBuffer, pos + 1, U32_SIZE);
break;
}
case UUID:
size = 1L + UUID_SIZE;
break;
default:
throw new IllegalArgumentException(String.format("Unknown primitive type in variant: %d", typeInfo));
}
Preconditions.checkArgument(size <= length, "variant value extends past buffer");
}

/**
* Computes the actual size (in bytes) of the Variant value.
* @param value The Variant value binary
Expand Down
Loading