This is an automated email from the ASF dual-hosted git repository.

wgtmac pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-java.git


The following commit(s) were added to refs/heads/master by this push:
     new 4f868efbc GH-3011: Deny further writes after 
InternalParquetRecordWriter is aborted (#3450)
4f868efbc is described below

commit 4f868efbc351ff57c0a2b14183b6208c17552a78
Author: YangJie <[email protected]>
AuthorDate: Mon Apr 20 09:43:50 2026 +0800

    GH-3011: Deny further writes after InternalParquetRecordWriter is aborted 
(#3450)
---
 .../hadoop/InternalParquetRecordWriter.java        |  3 ++
 .../parquet/hadoop/TestParquetWriterError.java     | 46 ++++++++++++++++++++++
 2 files changed, 49 insertions(+)

diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
index 41b068d01..4cb0e002d 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
@@ -152,6 +152,9 @@ class InternalParquetRecordWriter<T> {
   }
 
   public void write(T value) throws IOException, InterruptedException {
+    if (aborted) {
+      throw new IOException("Writer has been aborted due to a previous error 
and cannot accept further writes");
+    }
     try {
       writeSupport.write(value);
       ++recordCount;
diff --git 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriterError.java
 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriterError.java
index 51f8a7dd6..c747f9933 100644
--- 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriterError.java
+++ 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriterError.java
@@ -29,14 +29,18 @@ import java.util.Map;
 import java.util.Random;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.parquet.bytes.DirectByteBufferAllocator;
+import org.apache.parquet.bytes.HeapByteBufferAllocator;
 import org.apache.parquet.bytes.TrackingByteBufferAllocator;
 import org.apache.parquet.column.ParquetProperties;
 import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
 import org.apache.parquet.filter2.recordlevel.PhoneBookWriter;
 import org.apache.parquet.hadoop.codec.CleanUtil;
 import org.apache.parquet.hadoop.example.ExampleParquetWriter;
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
 import org.apache.parquet.io.LocalOutputFile;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.MessageTypeParser;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
@@ -58,6 +62,48 @@ public class TestParquetWriterError {
   @Rule
   public TemporaryFolder tmpFolder = new TemporaryFolder();
 
+  @Test
+  public void testWriteAfterAbortShouldThrow() throws Exception {
+    java.nio.file.Path outputFile = 
tmpFolder.newFile("abort_test.parquet").toPath();
+    MessageType schema =
+        MessageTypeParser.parseMessageType("message test { required binary 
name; required int32 age; }");
+    SimpleGroupFactory groupFactory = new SimpleGroupFactory(schema);
+
+    try (TrackingByteBufferAllocator allocator = 
TrackingByteBufferAllocator.wrap(new HeapByteBufferAllocator())) {
+      ParquetWriter<Group> writer = ExampleParquetWriter.builder(new 
LocalOutputFile(outputFile))
+          .withType(schema)
+          .withAllocator(allocator)
+          .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
+          .build();
+
+      // Write one valid record
+      writer.write(groupFactory.newGroup().append("name", 
"Alice").append("age", 30));
+
+      // Simulate an aborted state by reflectively setting the aborted flag
+      // on the internal writer. This mirrors what happens when a write fails
+      // with an exception (e.g. OOM during page flush).
+      Field internalWriterField = 
ParquetWriter.class.getDeclaredField("writer");
+      internalWriterField.setAccessible(true);
+      InternalParquetRecordWriter<?> internalWriter =
+          (InternalParquetRecordWriter<?>) internalWriterField.get(writer);
+      Field abortedField = 
InternalParquetRecordWriter.class.getDeclaredField("aborted");
+      abortedField.setAccessible(true);
+      abortedField.setBoolean(internalWriter, true);
+
+      // Now try to write again - this should throw IOException
+      IOException e = Assert.assertThrows(
+          "Expected IOException when writing to an aborted writer",
+          IOException.class,
+          () -> writer.write(
+              groupFactory.newGroup().append("name", "Charlie").append("age", 
25)));
+      Assert.assertTrue(
+          "Error message should mention aborted state", 
e.getMessage().contains("aborted"));
+
+      // Close should not throw (it should silently skip flushing due to 
aborted state)
+      writer.close();
+    }
+  }
+
   @Test
   public void testInSeparateProcess() throws IOException, InterruptedException 
{
     String outputFile = tmpFolder.newFile("out.parquet").toString();

Reply via email to