This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 94343a0271af [HUDI-9504] Fix flushing issue for flink append write 
with buffer sort (#13879)
94343a0271af is described below

commit 94343a0271af0c0cd6dcacadddfb5f3bac77a1f1
Author: Shuo Cheng <[email protected]>
AuthorDate: Fri Sep 12 13:13:28 2025 +0800

    [HUDI-9504] Fix flushing issue for flink append write with buffer sort 
(#13879)
---
 .../append/AppendWriteFunctionWithBufferSort.java  | 56 +++++++++++++---------
 .../ITTestAppendWriteFunctionWithBufferSort.java   | 25 ++++++----
 2 files changed, 49 insertions(+), 32 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithBufferSort.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithBufferSort.java
index 16146ccec00f..b596c7f683eb 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithBufferSort.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithBufferSort.java
@@ -18,25 +18,27 @@
 
 package org.apache.hudi.sink.append;
 
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
+import org.apache.hudi.sink.buffer.MemorySegmentPoolFactory;
+import org.apache.hudi.sink.bulk.sort.SortOperatorGen;
+import org.apache.hudi.sink.utils.BufferUtils;
+import org.apache.hudi.util.MutableIteratorWrapperIterator;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.operators.sort.QuickSort;
+import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.planner.codegen.sort.SortCodeGenerator;
 import org.apache.flink.table.runtime.generated.GeneratedNormalizedKeyComputer;
 import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
 import org.apache.flink.table.runtime.operators.sort.BinaryInMemorySortBuffer;
 import org.apache.flink.table.runtime.util.MemorySegmentPool;
-import org.apache.hudi.configuration.FlinkOptions;
-import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.util.Collector;
-import org.apache.flink.util.FlinkRuntimeException;
-import org.apache.hudi.sink.buffer.MemorySegmentPoolFactory;
-import org.apache.hudi.sink.bulk.sort.SortOperatorGen;
-import org.apache.hudi.sink.utils.BufferUtils;
-import org.apache.flink.runtime.operators.sort.QuickSort;
-import org.apache.hudi.util.MutableIteratorWrapperIterator;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -83,6 +85,7 @@ public class AppendWriteFunctionWithBufferSort<T> extends 
AppendWriteFunction<T>
             memorySegmentPool,
             
keyComputer.newInstance(Thread.currentThread().getContextClassLoader()),
             
recordComparator.newInstance(Thread.currentThread().getContextClassLoader()));
+    LOG.info("{} is initialized successfully.", getClass().getSimpleName());
   }
 
   @Override
@@ -97,7 +100,7 @@ public class AppendWriteFunctionWithBufferSort<T> extends 
AppendWriteFunction<T>
       // 3. write the row again
       success = buffer.write(data);
       if (!success) {
-        throw new RuntimeException("Buffer is too small to hold a single 
record.");
+        throw new HoodieException("Buffer is too small to hold a single 
record.");
       }
     }
 
@@ -111,27 +114,34 @@ public class AppendWriteFunctionWithBufferSort<T> extends 
AppendWriteFunction<T>
     try {
       sortAndSend();
     } catch (IOException e) {
-      LOG.error("Fail to sort and flush data in buffer during snapshot 
state.", e);
-      throw new FlinkRuntimeException(e);
+      throw new HoodieIOException("Fail to sort and flush data in buffer 
during snapshot state.", e);
     }
     super.snapshotState();
   }
 
+  @Override
+  public void endInput() {
+    try {
+      sortAndSend();
+    } catch (IOException e) {
+      throw new HoodieIOException("Fail to sort and flush data in buffer 
during endInput.", e);
+    }
+    super.endInput();
+  }
+
   /**
-   *  For append writing, the flushing can be triggered with two conditions:
-   *  1. Checkpoint trigger. in which current remaining data in buffer are 
flushed and committed.
-   *  2. Binary buffer is full.
-   *
-   *  set the size of buffer as the max size of parquet file, e.g., 
PARQUET_MAX_FILE_SIZE, to keep aligned
-   *  with current behavior considering the file's size / quantity.
-   *
-   * @throws IOException
+   * For append writing, the flushing can be triggered with the following 
conditions:
+   * 1. Checkpoint trigger, in which the current remaining data in buffer are 
flushed and committed.
+   * 2. Binary buffer is full.
+   * 3. `endInput` is called for pipelines with a bounded source.
    */
   private void sortAndSend() throws IOException {
+    if (buffer.isEmpty()) {
+      return;
+    }
     if (this.writerHelper == null) {
       initWriterHelper();
     }
-
     sort(buffer);
     Iterator<BinaryRowData> iterator =
             new MutableIteratorWrapperIterator<>(
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/append/ITTestAppendWriteFunctionWithBufferSort.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/append/ITTestAppendWriteFunctionWithBufferSort.java
index 3223561c4579..38ad5487762a 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/append/ITTestAppendWriteFunctionWithBufferSort.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/append/ITTestAppendWriteFunctionWithBufferSort.java
@@ -36,6 +36,8 @@ import org.apache.flink.table.data.TimestampData;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.File;
 import java.sql.Timestamp;
@@ -90,11 +92,12 @@ public class ITTestAppendWriteFunctionWithBufferSort 
extends TestWriteBase {
 
     // Verify all data was written
     List<GenericRecord> actualData = TestData.readAllData(new 
File(conf.get(FlinkOptions.PATH)), rowType, 1);
-    assertEquals(100, actualData.size());
+    assertEquals(150, actualData.size());
   }
 
-  @Test
-  public void testBufferFlushOnCheckpoint() throws Exception {
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testBufferFlush(boolean flushOnCheckpoint) throws Exception {
     // Create test data
     List<RowData> inputData = Arrays.asList(
         createRowData("uuid1", "Bob", 30, "1970-01-01 00:00:01.123", "p1"),
@@ -102,11 +105,15 @@ public class ITTestAppendWriteFunctionWithBufferSort 
extends TestWriteBase {
     );
 
     // Write the data and wait for timer
-    TestWriteBase.TestHarness.instance()
-        .preparePipeline(tempFile, conf)
-        .consume(inputData)
-        .checkpoint(1)
-        .endInput();
+    TestHarness testHarness =
+        TestWriteBase.TestHarness.instance()
+            .preparePipeline(tempFile, conf)
+            .consume(inputData);
+    if (flushOnCheckpoint) {
+      testHarness.checkpoint(1);
+    } else {
+      testHarness.endInput();
+    }
 
     // Verify data was written
     List<GenericRecord> actualData = TestData.readAllData(new 
File(conf.get(FlinkOptions.PATH)), rowType, 1);
@@ -134,7 +141,7 @@ public class ITTestAppendWriteFunctionWithBufferSort 
extends TestWriteBase {
 
     // Verify all data was written
     List<GenericRecord> actualData = TestData.readAllData(new 
File(conf.get(FlinkOptions.PATH)), rowType, 1);
-    assertEquals(1198, actualData.size());
+    assertEquals(2000, actualData.size());
   }
 
   @Test

Reply via email to