This is an automated email from the ASF dual-hosted git repository.
Fokko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-java.git
The following commit(s) were added to refs/heads/master by this push:
new 94284873f GH-3484: Eliminate per-page heap allocation for CRC32 (#3485)
94284873f is described below
commit 94284873f403766504748d9c79f82961b77d80af
Author: André Rouél <[email protected]>
AuthorDate: Mon May 18 06:04:13 2026 +0200
GH-3484: Eliminate per-page heap allocation for CRC32 (#3485)
* GH-3484 Eliminate per-page heap allocation for CRC32 checksums when using
direct `ByteBufferAllocator`
Why this is safe
- CRC32.update(ByteBuffer) exists since Java 9, processes bytes from
position to limit, advancing position.
- toByteBuffer(releaser) returns either a slice() of the internal buffer
(independent position) or a freshly allocated copy. Either way, the original
BytesInput is unaffected for the subsequent buf.collect() call, because
ByteBufferBytesInput.writeInto() uses buffer.duplicate().
- When the allocator is direct, toByteBuffer(releaser) returns the direct
buffer directly -- zero heap copy. When the allocator is heap-based, behavior
is functionally equivalent to the old toByteArray() path.
- The releaser field already exists on ColumnChunkPageWriter (line 124) and
manages buffer lifecycle.
* GH-3484 Release CRC32 ByteBuffers per page to avoid leak across column
chunk
The previous change reused the ColumnChunkPageWriter's long-lived
ByteBufferReleaser for the CRC32 update buffers. That releaser only
closes when the column chunk writer closes (end of row group), so
every per-page allocation made by BytesInput.toByteBuffer(releaser)
accumulated until then. With many pages in a single large column
chunk this exhausted the heap (TestLargeColumnChunk OOM in CI).
Scope each CRC update to a try-with-resources ByteBufferReleaser so
allocated buffers are returned to the allocator immediately after
crc.update(). writePageV2 shares one per-page releaser across the
three updates.
---
.../parquet/hadoop/ColumnChunkPageWriteStore.java | 24 ++++++++++++++--------
1 file changed, 15 insertions(+), 9 deletions(-)
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
index d9e6ea099..c0cf216cc 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
@@ -94,6 +94,7 @@ public class ColumnChunkPageWriteStore implements
PageWriteStore, BloomFilterWri
private Statistics totalStatistics;
private final SizeStatistics totalSizeStatistics;
private final GeospatialStatistics totalGeospatialStatistics;
+ private final ByteBufferAllocator allocator;
private final ByteBufferReleaser releaser;
private final CRC32 crc;
@@ -121,6 +122,7 @@ public class ColumnChunkPageWriteStore implements
PageWriteStore, BloomFilterWri
int columnOrdinal) {
this.path = path;
this.compressor = compressor;
+ this.allocator = allocator;
this.releaser = new ByteBufferReleaser(allocator);
this.buf = new ConcatenatingByteBufferCollector(allocator);
this.columnIndexBuilder =
ColumnIndexBuilder.getBuilder(path.getPrimitiveType(),
columnIndexTruncateLength);
@@ -217,7 +219,9 @@ public class ColumnChunkPageWriteStore implements
PageWriteStore, BloomFilterWri
}
if (pageWriteChecksumEnabled) {
crc.reset();
- crc.update(compressedBytes.toByteArray());
+ try (ByteBufferReleaser pageReleaser = new
ByteBufferReleaser(allocator)) {
+ crc.update(compressedBytes.toByteBuffer(pageReleaser));
+ }
parquetMetadataConverter.writeDataPageV1Header(
(int) uncompressedSize,
(int) compressedSize,
@@ -321,14 +325,16 @@ public class ColumnChunkPageWriteStore implements
PageWriteStore, BloomFilterWri
}
if (pageWriteChecksumEnabled) {
crc.reset();
- if (repetitionLevels.size() > 0) {
- crc.update(repetitionLevels.toByteArray());
- }
- if (definitionLevels.size() > 0) {
- crc.update(definitionLevels.toByteArray());
- }
- if (compressedData.size() > 0) {
- crc.update(compressedData.toByteArray());
+ try (ByteBufferReleaser pageReleaser = new
ByteBufferReleaser(allocator)) {
+ if (repetitionLevels.size() > 0) {
+ crc.update(repetitionLevels.toByteBuffer(pageReleaser));
+ }
+ if (definitionLevels.size() > 0) {
+ crc.update(definitionLevels.toByteBuffer(pageReleaser));
+ }
+ if (compressedData.size() > 0) {
+ crc.update(compressedData.toByteBuffer(pageReleaser));
+ }
}
parquetMetadataConverter.writeDataPageV2Header(
uncompressedSize,