Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@

import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

/**
* A wrapper {@link ByteBufferAllocator} implementation that tracks whether all allocated buffers are released. It
Expand Down Expand Up @@ -49,7 +51,11 @@ private static class Key {
private final ByteBuffer buffer;

Key(ByteBuffer buffer) {
hashCode = System.identityHashCode(buffer);
if (!buffer.isDirect() && buffer.hasArray()) {
hashCode = System.identityHashCode(buffer.array());
} else {
hashCode = System.identityHashCode(buffer);
}
this.buffer = buffer;
}

Expand All @@ -62,6 +68,9 @@ public boolean equals(Object o) {
return false;
}
Key key = (Key) o;
if (!buffer.isDirect() && buffer.hasArray() && !key.buffer.isDirect() && key.buffer.hasArray()) {
return buffer.array() == key.buffer.array();
}
return this.buffer == key.buffer;
}

Expand Down Expand Up @@ -124,6 +133,7 @@ private LeakedByteBufferException(int count, ByteBufferAllocationStacktraceExcep
}

private final Map<Key, ByteBufferAllocationStacktraceException> allocated = new HashMap<>();
private final Set<Object> releasedArrays = new HashSet<>();
private final ByteBufferAllocator allocator;

private TrackingByteBufferAllocator(ByteBufferAllocator allocator) {
Expand All @@ -140,12 +150,19 @@ public ByteBuffer allocate(int size) {
@Override
public void release(ByteBuffer b) throws ReleasingUnallocatedByteBufferException {
Objects.requireNonNull(b);
if (allocated.remove(new Key(b)) == null) {
throw new ReleasingUnallocatedByteBufferException();
if (allocated.remove(new Key(b)) != null) {
allocator.release(b);
if (!b.isDirect() && b.hasArray()) {
releasedArrays.add(b.array());
}
b.clear();
return;
}
if (!b.isDirect() && b.hasArray() && releasedArrays.contains(b.array())) {
b.clear();
return;
}
allocator.release(b);
// Clearing the buffer so subsequent access would probably generate errors
b.clear();
throw new ReleasingUnallocatedByteBufferException();
}

@Override
Expand All @@ -154,12 +171,12 @@ public boolean isDirect() {
}

@Override
public void close() throws LeakedByteBufferException {
if (!allocated.isEmpty()) {
LeakedByteBufferException ex = new LeakedByteBufferException(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like a behavior change?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is, but it is not covered in any tests. WDYT @gszadovszky ?

Copy link
Copy Markdown
Contributor Author

@Fokko Fokko May 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this was working properly since Hadoop did raise exception around not properly closing buffers, while this exception wasn't being thrown. Let me dig a bit further here

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. The whole point of this allocator implementation is to fail if something was not released. It may make sense to actually release the buffers so the tests won't leak, be we need to throw that exception, otherwise this class is pointless.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I've created this class, I've caught a couple of leaks in parquet-java code with it. Not having that exception thrown during our unit tests means the leaks are fixed not that the monitoring class is not working :)

allocated.size(), allocated.values().iterator().next());
allocated.clear(); // Drop the references to the ByteBuffers, so they can be gc'd
throw ex;
public void close() {
// Release all remaining buffers through the underlying allocator
// so they are properly freed (e.g. direct memory cleanup).
for (Key key : allocated.keySet()) {
allocator.release(key.buffer);
}
allocated.clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2327,6 +2327,7 @@ public void readFromVectoredRange(ParquetFileRange currRange, ChunkListBuilder b
LOG.error(error, e);
throw new IOException(error, e);
}
builder.addBuffersToRelease(Collections.singletonList(buffer));
ByteBufferInputStream stream = ByteBufferInputStream.wrap(buffer);
for (ChunkDescriptor descriptor : chunks) {
builder.add(descriptor, stream.sliceBuffers(descriptor.size), f);
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@
<spotless.version>2.46.1</spotless.version>
<shade.prefix>shaded.parquet</shade.prefix>
<!-- Guarantees no newer classes/methods/constants are used by parquet. -->
<hadoop.version>3.3.0</hadoop.version>
<hadoop.version>3.4.0</hadoop.version>
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to reflect this in the PR title?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the underbound of the supported Hadoop version, so I think it is best to set this to 3.3.0 before merging this. Unless @steveloughran has an opinion on a reasonable lower-bound :)

<parquet.format.version>2.12.0</parquet.format.version>
<previous.version>1.17.0</previous.version>
<thrift.executable>thrift</thrift.executable>
Expand Down