This is an automated email from the ASF dual-hosted git repository.
wgtmac pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-java.git
The following commit(s) were added to refs/heads/master by this push:
new 4f868efbc GH-3011: Deny further writes after
InternalParquetRecordWriter is aborted (#3450)
4f868efbc is described below
commit 4f868efbc351ff57c0a2b14183b6208c17552a78
Author: YangJie <[email protected]>
AuthorDate: Mon Apr 20 09:43:50 2026 +0800
GH-3011: Deny further writes after InternalParquetRecordWriter is aborted
(#3450)
---
.../hadoop/InternalParquetRecordWriter.java | 3 ++
.../parquet/hadoop/TestParquetWriterError.java | 46 ++++++++++++++++++++++
2 files changed, 49 insertions(+)
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
index 41b068d01..4cb0e002d 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
@@ -152,6 +152,9 @@ class InternalParquetRecordWriter<T> {
}
public void write(T value) throws IOException, InterruptedException {
+ if (aborted) {
+ throw new IOException("Writer has been aborted due to a previous error
and cannot accept further writes");
+ }
try {
writeSupport.write(value);
++recordCount;
diff --git
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriterError.java
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriterError.java
index 51f8a7dd6..c747f9933 100644
---
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriterError.java
+++
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriterError.java
@@ -29,14 +29,18 @@ import java.util.Map;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.bytes.DirectByteBufferAllocator;
+import org.apache.parquet.bytes.HeapByteBufferAllocator;
import org.apache.parquet.bytes.TrackingByteBufferAllocator;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
import org.apache.parquet.filter2.recordlevel.PhoneBookWriter;
import org.apache.parquet.hadoop.codec.CleanUtil;
import org.apache.parquet.hadoop.example.ExampleParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.io.LocalOutputFile;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.MessageTypeParser;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
@@ -58,6 +62,48 @@ public class TestParquetWriterError {
@Rule
public TemporaryFolder tmpFolder = new TemporaryFolder();
+ @Test
+ public void testWriteAfterAbortShouldThrow() throws Exception {
+ java.nio.file.Path outputFile =
tmpFolder.newFile("abort_test.parquet").toPath();
+ MessageType schema =
+ MessageTypeParser.parseMessageType("message test { required binary
name; required int32 age; }");
+ SimpleGroupFactory groupFactory = new SimpleGroupFactory(schema);
+
+ try (TrackingByteBufferAllocator allocator =
TrackingByteBufferAllocator.wrap(new HeapByteBufferAllocator())) {
+ ParquetWriter<Group> writer = ExampleParquetWriter.builder(new
LocalOutputFile(outputFile))
+ .withType(schema)
+ .withAllocator(allocator)
+ .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
+ .build();
+
+ // Write one valid record
+ writer.write(groupFactory.newGroup().append("name",
"Alice").append("age", 30));
+
+ // Simulate an aborted state by reflectively setting the aborted flag
+ // on the internal writer. This mirrors what happens when a write fails
+ // with an exception (e.g. OOM during page flush).
+ Field internalWriterField =
ParquetWriter.class.getDeclaredField("writer");
+ internalWriterField.setAccessible(true);
+ InternalParquetRecordWriter<?> internalWriter =
+ (InternalParquetRecordWriter<?>) internalWriterField.get(writer);
+ Field abortedField =
InternalParquetRecordWriter.class.getDeclaredField("aborted");
+ abortedField.setAccessible(true);
+ abortedField.setBoolean(internalWriter, true);
+
+ // Now try to write again - this should throw IOException
+ IOException e = Assert.assertThrows(
+ "Expected IOException when writing to an aborted writer",
+ IOException.class,
+ () -> writer.write(
+ groupFactory.newGroup().append("name", "Charlie").append("age",
25)));
+ Assert.assertTrue(
+ "Error message should mention aborted state",
e.getMessage().contains("aborted"));
+
+ // Close should not throw (it should silently skip flushing due to
aborted state)
+ writer.close();
+ }
+ }
+
@Test
public void testInSeparateProcess() throws IOException, InterruptedException
{
String outputFile = tmpFolder.newFile("out.parquet").toString();