This is an automated email from the ASF dual-hosted git repository.
Fokko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-java.git
The following commit(s) were added to refs/heads/master by this push:
new b38a6f9ef GH-3478: Fix LZ4_RAW heap decompression (#3486)
b38a6f9ef is described below
commit b38a6f9ef981114fccb143ac1fa7d4596df1cb84
Author: Anupam Yadav <[email protected]>
AuthorDate: Wed May 6 12:54:03 2026 -0700
GH-3478: Fix LZ4_RAW heap decompression (#3486)
Eagerly materialize the decompressed stream for LZ4_RAW in CodecFactory,
matching the existing pattern used for ZSTD. Without this, the lazy
StreamBytesInput.writeInto() path reads via Channels.newChannel() in
~8KB chunks, but LZ4_RAW requires all compressed input in a single
buffer for one-shot decompression.
Added a regression test that compresses and decompresses a 16KB page
through the CodecFactory heap path, then calls BytesInput.copy() to
exercise the chunked materialization code path. The test fails without
the fix and passes with it.
Co-authored-by: Anupam Yadav <[email protected]>
---
.../org/apache/parquet/hadoop/CodecFactory.java | 12 ++++---
.../parquet/hadoop/codec/TestCompressionCodec.java | 37 ++++++++++++++++++++++
2 files changed, 44 insertions(+), 5 deletions(-)
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java
index eee5fa608..98b49835a 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java
@@ -39,6 +39,7 @@ import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.compression.CompressionCodecFactory;
import org.apache.parquet.conf.HadoopParquetConfiguration;
import org.apache.parquet.conf.ParquetConfiguration;
+import org.apache.parquet.hadoop.codec.Lz4RawCodec;
import org.apache.parquet.hadoop.codec.ZstandardCodec;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.hadoop.util.ConfigurationUtil;
@@ -170,11 +171,12 @@ public class CodecFactory implements
CompressionCodecFactory {
}
InputStream is = codec.createInputStream(bytes.toInputStream(),
decompressor);
- // We need to explicitly close the ZstdDecompressorStream here to
release the resources it holds to
- // avoid off-heap memory fragmentation issue, see
https://github.com/apache/parquet-format/issues/398.
- // This change will load the decompressor stream into heap a little
earlier, since the problem it solves
- // only happens in the ZSTD codec, so this modification is only made for
ZSTD streams.
- if (codec instanceof ZstandardCodec) {
+ // Eagerly materialize the decompressed stream for codecs that require
all input in a single buffer.
+ // ZSTD: releases off-heap resources early to avoid fragmentation (see
parquet-format#398).
+ // LZ4_RAW: requires one-shot decompression; the lazy
StreamBytesInput.writeInto() path reads via
+ // Channels.newChannel() in ~8KB chunks, causing the decompressor to be
called with an undersized
+ // output buffer (see #3478).
+ if (codec instanceof ZstandardCodec || codec instanceof Lz4RawCodec) {
decompressed = BytesInput.copy(BytesInput.from(is, decompressedSize));
is.close();
} else {
diff --git
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/codec/TestCompressionCodec.java
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/codec/TestCompressionCodec.java
index f202c6350..ae2bd87ac 100644
---
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/codec/TestCompressionCodec.java
+++
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/codec/TestCompressionCodec.java
@@ -33,7 +33,13 @@ import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.io.compress.Compressor;
import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.parquet.bytes.ByteBufferReleaser;
import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.bytes.DirectByteBufferAllocator;
+import org.apache.parquet.bytes.TrackingByteBufferAllocator;
+import
org.apache.parquet.compression.CompressionCodecFactory.BytesInputCompressor;
+import
org.apache.parquet.compression.CompressionCodecFactory.BytesInputDecompressor;
+import org.apache.parquet.hadoop.CodecFactory;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.junit.Assert;
import org.junit.Test;
@@ -41,6 +47,8 @@ import org.mockito.Mockito;
public class TestCompressionCodec {
+ private final int pageSize = 64 * 1024;
+
@Test
public void testLz4RawBlock() throws IOException {
testBlock(CompressionCodecName.LZ4_RAW);
@@ -177,6 +185,35 @@ public class TestCompressionCodec {
}
}
+ /**
+ * Regression test for #3478: LZ4_RAW heap decompression fails when the
decompressed page
+ * exceeds the ~8KB chunk size used by stream materialization in
BytesInput.copy().
+ */
+ @Test
+ public void testLz4RawHeapDecompressorCanCopyLargePage() throws IOException {
+ final int size = 16 * 1024;
+ final byte[] raw = new byte[size];
+ new Random(42).nextBytes(raw);
+
+ try (TrackingByteBufferAllocator allocator =
+ TrackingByteBufferAllocator.wrap(new DirectByteBufferAllocator());
+ ByteBufferReleaser releaser = new ByteBufferReleaser(allocator)) {
+ CodecFactory heapCodecFactory = new CodecFactory(new Configuration(),
pageSize);
+ BytesInputCompressor compressor =
heapCodecFactory.getCompressor(CompressionCodecName.LZ4_RAW);
+ BytesInputDecompressor decompressor =
heapCodecFactory.getDecompressor(CompressionCodecName.LZ4_RAW);
+
+ BytesInput compressed = compressor.compress(BytesInput.from(raw));
+ BytesInput decompressed = decompressor.decompress(compressed, size);
+
+ BytesInput copied = decompressed.copy(releaser);
+ Assert.assertArrayEquals(raw, copied.toByteArray());
+
+ compressor.release();
+ decompressor.release();
+ heapCodecFactory.release();
+ }
+ }
+
@Test
public void TestDecompressorInvalidState() throws IOException {
// Create a mock Decompressor that returns 0 when decompress is called.