This is an automated email from the ASF dual-hosted git repository.

Fokko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-java.git


The following commit(s) were added to refs/heads/master by this push:
     new b38a6f9ef GH-3478: Fix LZ4_RAW heap decompression (#3486)
b38a6f9ef is described below

commit b38a6f9ef981114fccb143ac1fa7d4596df1cb84
Author: Anupam Yadav <[email protected]>
AuthorDate: Wed May 6 12:54:03 2026 -0700

    GH-3478: Fix LZ4_RAW heap decompression (#3486)
    
    Eagerly materialize the decompressed stream for LZ4_RAW in CodecFactory,
    matching the existing pattern used for ZSTD. Without this, the lazy
    StreamBytesInput.writeInto() path reads via Channels.newChannel() in
    ~8KB chunks, but LZ4_RAW requires all compressed input in a single
    buffer for one-shot decompression.
    
    Added a regression test that compresses and decompresses a 16KB page
    through the CodecFactory heap path, then calls BytesInput.copy() to
    exercise the chunked materialization code path. The test fails without
    the fix and passes with it.
    
    Co-authored-by: Anupam Yadav <[email protected]>
---
 .../org/apache/parquet/hadoop/CodecFactory.java    | 12 ++++---
 .../parquet/hadoop/codec/TestCompressionCodec.java | 37 ++++++++++++++++++++++
 2 files changed, 44 insertions(+), 5 deletions(-)

diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java
index eee5fa608..98b49835a 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java
@@ -39,6 +39,7 @@ import org.apache.parquet.bytes.BytesInput;
 import org.apache.parquet.compression.CompressionCodecFactory;
 import org.apache.parquet.conf.HadoopParquetConfiguration;
 import org.apache.parquet.conf.ParquetConfiguration;
+import org.apache.parquet.hadoop.codec.Lz4RawCodec;
 import org.apache.parquet.hadoop.codec.ZstandardCodec;
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
 import org.apache.parquet.hadoop.util.ConfigurationUtil;
@@ -170,11 +171,12 @@ public class CodecFactory implements 
CompressionCodecFactory {
       }
       InputStream is = codec.createInputStream(bytes.toInputStream(), 
decompressor);
 
-      // We need to explicitly close the ZstdDecompressorStream here to 
release the resources it holds to
-      // avoid off-heap memory fragmentation issue, see 
https://github.com/apache/parquet-format/issues/398.
-      // This change will load the decompressor stream into heap a little 
earlier, since the problem it solves
-      // only happens in the ZSTD codec, so this modification is only made for 
ZSTD streams.
-      if (codec instanceof ZstandardCodec) {
+      // Eagerly materialize the decompressed stream for codecs that require 
all input in a single buffer.
+      // ZSTD: releases off-heap resources early to avoid fragmentation (see 
parquet-format#398).
+      // LZ4_RAW: requires one-shot decompression; the lazy 
StreamBytesInput.writeInto() path reads via
+      // Channels.newChannel() in ~8KB chunks, causing the decompressor to be 
called with an undersized
+      // output buffer (see #3478).
+      if (codec instanceof ZstandardCodec || codec instanceof Lz4RawCodec) {
         decompressed = BytesInput.copy(BytesInput.from(is, decompressedSize));
         is.close();
       } else {
diff --git 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/codec/TestCompressionCodec.java
 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/codec/TestCompressionCodec.java
index f202c6350..ae2bd87ac 100644
--- 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/codec/TestCompressionCodec.java
+++ 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/codec/TestCompressionCodec.java
@@ -33,7 +33,13 @@ import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionOutputStream;
 import org.apache.hadoop.io.compress.Compressor;
 import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.parquet.bytes.ByteBufferReleaser;
 import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.bytes.DirectByteBufferAllocator;
+import org.apache.parquet.bytes.TrackingByteBufferAllocator;
+import 
org.apache.parquet.compression.CompressionCodecFactory.BytesInputCompressor;
+import 
org.apache.parquet.compression.CompressionCodecFactory.BytesInputDecompressor;
+import org.apache.parquet.hadoop.CodecFactory;
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
 import org.junit.Assert;
 import org.junit.Test;
@@ -41,6 +47,8 @@ import org.mockito.Mockito;
 
 public class TestCompressionCodec {
 
+  private final int pageSize = 64 * 1024;
+
   @Test
   public void testLz4RawBlock() throws IOException {
     testBlock(CompressionCodecName.LZ4_RAW);
@@ -177,6 +185,35 @@ public class TestCompressionCodec {
     }
   }
 
+  /**
+   * Regression test for #3478: LZ4_RAW heap decompression fails when the 
decompressed page
+   * exceeds the ~8KB chunk size used by stream materialization in 
BytesInput.copy().
+   */
+  @Test
+  public void testLz4RawHeapDecompressorCanCopyLargePage() throws IOException {
+    final int size = 16 * 1024;
+    final byte[] raw = new byte[size];
+    new Random(42).nextBytes(raw);
+
+    try (TrackingByteBufferAllocator allocator =
+            TrackingByteBufferAllocator.wrap(new DirectByteBufferAllocator());
+        ByteBufferReleaser releaser = new ByteBufferReleaser(allocator)) {
+      CodecFactory heapCodecFactory = new CodecFactory(new Configuration(), 
pageSize);
+      BytesInputCompressor compressor = 
heapCodecFactory.getCompressor(CompressionCodecName.LZ4_RAW);
+      BytesInputDecompressor decompressor = 
heapCodecFactory.getDecompressor(CompressionCodecName.LZ4_RAW);
+
+      BytesInput compressed = compressor.compress(BytesInput.from(raw));
+      BytesInput decompressed = decompressor.decompress(compressed, size);
+
+      BytesInput copied = decompressed.copy(releaser);
+      Assert.assertArrayEquals(raw, copied.toByteArray());
+
+      compressor.release();
+      decompressor.release();
+      heapCodecFactory.release();
+    }
+  }
+
   @Test
   public void TestDecompressorInvalidState() throws IOException {
     // Create a mock Decompressor that returns 0 when decompress is called.

Reply via email to