This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 94343a0271af [HUDI-9504] Fix flushing issue for flink append write
with buffer sort (#13879)
94343a0271af is described below
commit 94343a0271af0c0cd6dcacadddfb5f3bac77a1f1
Author: Shuo Cheng <[email protected]>
AuthorDate: Fri Sep 12 13:13:28 2025 +0800
[HUDI-9504] Fix flushing issue for flink append write with buffer sort
(#13879)
---
.../append/AppendWriteFunctionWithBufferSort.java | 56 +++++++++++++---------
.../ITTestAppendWriteFunctionWithBufferSort.java | 25 ++++++----
2 files changed, 49 insertions(+), 32 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithBufferSort.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithBufferSort.java
index 16146ccec00f..b596c7f683eb 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithBufferSort.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithBufferSort.java
@@ -18,25 +18,27 @@
package org.apache.hudi.sink.append;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
+import org.apache.hudi.sink.buffer.MemorySegmentPoolFactory;
+import org.apache.hudi.sink.bulk.sort.SortOperatorGen;
+import org.apache.hudi.sink.utils.BufferUtils;
+import org.apache.hudi.util.MutableIteratorWrapperIterator;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.operators.sort.QuickSort;
+import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.planner.codegen.sort.SortCodeGenerator;
import org.apache.flink.table.runtime.generated.GeneratedNormalizedKeyComputer;
import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
import org.apache.flink.table.runtime.operators.sort.BinaryInMemorySortBuffer;
import org.apache.flink.table.runtime.util.MemorySegmentPool;
-import org.apache.hudi.configuration.FlinkOptions;
-import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Collector;
-import org.apache.flink.util.FlinkRuntimeException;
-import org.apache.hudi.sink.buffer.MemorySegmentPoolFactory;
-import org.apache.hudi.sink.bulk.sort.SortOperatorGen;
-import org.apache.hudi.sink.utils.BufferUtils;
-import org.apache.flink.runtime.operators.sort.QuickSort;
-import org.apache.hudi.util.MutableIteratorWrapperIterator;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -83,6 +85,7 @@ public class AppendWriteFunctionWithBufferSort<T> extends
AppendWriteFunction<T>
memorySegmentPool,
keyComputer.newInstance(Thread.currentThread().getContextClassLoader()),
recordComparator.newInstance(Thread.currentThread().getContextClassLoader()));
+ LOG.info("{} is initialized successfully.", getClass().getSimpleName());
}
@Override
@@ -97,7 +100,7 @@ public class AppendWriteFunctionWithBufferSort<T> extends
AppendWriteFunction<T>
// 3. write the row again
success = buffer.write(data);
if (!success) {
- throw new RuntimeException("Buffer is too small to hold a single
record.");
+ throw new HoodieException("Buffer is too small to hold a single
record.");
}
}
@@ -111,27 +114,34 @@ public class AppendWriteFunctionWithBufferSort<T> extends
AppendWriteFunction<T>
try {
sortAndSend();
} catch (IOException e) {
- LOG.error("Fail to sort and flush data in buffer during snapshot
state.", e);
- throw new FlinkRuntimeException(e);
+ throw new HoodieIOException("Fail to sort and flush data in buffer
during snapshot state.", e);
}
super.snapshotState();
}
+ @Override
+ public void endInput() {
+ try {
+ sortAndSend();
+ } catch (IOException e) {
+ throw new HoodieIOException("Fail to sort and flush data in buffer
during endInput.", e);
+ }
+ super.endInput();
+ }
+
/**
- * For append writing, the flushing can be triggered with two conditions:
- * 1. Checkpoint trigger. in which current remaining data in buffer are
flushed and committed.
- * 2. Binary buffer is full.
- *
- * set the size of buffer as the max size of parquet file, e.g.,
PARQUET_MAX_FILE_SIZE, to keep aligned
- * with current behavior considering the file's size / quantity.
- *
- * @throws IOException
+ * For append writing, the flushing can be triggered with the following
conditions:
+ * 1. Checkpoint trigger, in which the current remaining data in buffer are
flushed and committed.
+ * 2. Binary buffer is full.
+ * 3. `endInput` is called for pipelines with a bounded source.
*/
private void sortAndSend() throws IOException {
+ if (buffer.isEmpty()) {
+ return;
+ }
if (this.writerHelper == null) {
initWriterHelper();
}
-
sort(buffer);
Iterator<BinaryRowData> iterator =
new MutableIteratorWrapperIterator<>(
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/append/ITTestAppendWriteFunctionWithBufferSort.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/append/ITTestAppendWriteFunctionWithBufferSort.java
index 3223561c4579..38ad5487762a 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/append/ITTestAppendWriteFunctionWithBufferSort.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/append/ITTestAppendWriteFunctionWithBufferSort.java
@@ -36,6 +36,8 @@ import org.apache.flink.table.data.TimestampData;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import java.io.File;
import java.sql.Timestamp;
@@ -90,11 +92,12 @@ public class ITTestAppendWriteFunctionWithBufferSort
extends TestWriteBase {
// Verify all data was written
List<GenericRecord> actualData = TestData.readAllData(new
File(conf.get(FlinkOptions.PATH)), rowType, 1);
- assertEquals(100, actualData.size());
+ assertEquals(150, actualData.size());
}
- @Test
- public void testBufferFlushOnCheckpoint() throws Exception {
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testBufferFlush(boolean flushOnCheckpoint) throws Exception {
// Create test data
List<RowData> inputData = Arrays.asList(
createRowData("uuid1", "Bob", 30, "1970-01-01 00:00:01.123", "p1"),
@@ -102,11 +105,15 @@ public class ITTestAppendWriteFunctionWithBufferSort
extends TestWriteBase {
);
// Write the data and wait for timer
- TestWriteBase.TestHarness.instance()
- .preparePipeline(tempFile, conf)
- .consume(inputData)
- .checkpoint(1)
- .endInput();
+ TestHarness testHarness =
+ TestWriteBase.TestHarness.instance()
+ .preparePipeline(tempFile, conf)
+ .consume(inputData);
+ if (flushOnCheckpoint) {
+ testHarness.checkpoint(1);
+ } else {
+ testHarness.endInput();
+ }
// Verify data was written
List<GenericRecord> actualData = TestData.readAllData(new
File(conf.get(FlinkOptions.PATH)), rowType, 1);
@@ -134,7 +141,7 @@ public class ITTestAppendWriteFunctionWithBufferSort
extends TestWriteBase {
// Verify all data was written
List<GenericRecord> actualData = TestData.readAllData(new
File(conf.get(FlinkOptions.PATH)), rowType, 1);
- assertEquals(1198, actualData.size());
+ assertEquals(2000, actualData.size());
}
@Test