This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch release-0.12.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 0b244193761d446e26670c64b68f6be61b5d64d3
Author: HunterXHunter <[email protected]>
AuthorDate: Thu Aug 4 20:28:28 2022 +0800

    [HUDI-4385] Support online compaction in the flink batch mode write (#6093)
    
    * [HUDI-4385] Support online compaction in the flink batch mode write
    
    Signed-off-by: HunterXHunter <[email protected]>
---
 .../hudi/sink/StreamWriteOperatorCoordinator.java      |  5 +++++
 .../hudi/sink/compact/CompactionPlanOperator.java      |  9 ++++++++-
 .../java/org/apache/hudi/table/HoodieTableSink.java    |  4 ++++
 .../hudi/sink/compact/ITTestHoodieFlinkCompactor.java  | 18 ++++++++++++++++++
 4 files changed, 35 insertions(+), 1 deletion(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
index 7d20789bff..e3b0d82704 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
@@ -424,6 +424,11 @@ public class StreamWriteOperatorCoordinator
         
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
         // sync Hive synchronously if it is enabled in batch mode.
         syncHive();
+        // schedules the compaction plan in batch execution mode
+        if (tableState.scheduleCompaction) {
+          // if async compaction is on, schedule the compaction
+          CompactionUtil.scheduleCompaction(metaClient, writeClient, 
tableState.isDeltaTimeCompaction, true);
+        }
       }
     }
   }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java
index d5e718883b..efaf4bfdc7 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.sink.compact;
 
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
 import org.apache.hudi.common.model.CompactionOperation;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -47,7 +48,7 @@ import static java.util.stream.Collectors.toList;
  * <p>It should be singleton to avoid conflicts.
  */
 public class CompactionPlanOperator extends 
AbstractStreamOperator<CompactionPlanEvent>
-    implements OneInputStreamOperator<Object, CompactionPlanEvent> {
+    implements OneInputStreamOperator<Object, CompactionPlanEvent>, 
BoundedOneInput {
 
   /**
    * Config options.
@@ -141,4 +142,10 @@ public class CompactionPlanOperator extends 
AbstractStreamOperator<CompactionPla
   public void setOutput(Output<StreamRecord<CompactionPlanEvent>> output) {
     this.output = output;
   }
+
+  @Override
+  public void endInput() throws Exception {
+    // Called when the input data ends, only used in batch mode.
+    notifyCheckpointComplete(-1);
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
index d579027911..5af86867d8 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
@@ -95,6 +95,10 @@ public class HoodieTableSink implements DynamicTableSink, 
SupportsPartitioning,
       pipeline = Pipelines.hoodieStreamWrite(conf, parallelism, 
hoodieRecordDataStream);
       // compaction
       if (OptionsResolver.needsAsyncCompaction(conf)) {
+        // use synchronous compaction for bounded source.
+        if (context.isBounded()) {
+          conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
+        }
         return Pipelines.compact(conf, pipeline);
       } else {
         return Pipelines.clean(conf, pipeline);
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java
index d8ffd6b1b1..a8b78ab64d 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java
@@ -42,6 +42,7 @@ import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
@@ -288,6 +289,23 @@ public class ITTestHoodieFlinkCompactor {
     TestData.checkWrittenDataCOW(tempFile, EXPECTED3);
   }
 
+  @Test
+  public void testCompactionInBatchExecutionMode() throws Exception {
+    EnvironmentSettings settings = 
EnvironmentSettings.newInstance().inBatchMode().build();
+    TableEnvironment tableEnv = TableEnvironmentImpl.create(settings);
+    tableEnv.getConfig().getConfiguration()
+            
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+    Map<String, String> options = new HashMap<>();
+    options.put(FlinkOptions.COMPACTION_DELTA_COMMITS.key(), "2");
+    options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
+    options.put(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
+    String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", 
options);
+    tableEnv.executeSql(hoodieTableDDL);
+    tableEnv.executeSql(TestSQL.INSERT_T1).await();
+    tableEnv.executeSql(TestSQL.UPDATE_INSERT_T1).await();
+    TestData.checkWrittenDataCOW(tempFile, EXPECTED2);
+  }
+
   private String scheduleCompactionPlan(HoodieTableMetaClient metaClient, 
HoodieFlinkWriteClient<?> writeClient) {
     boolean scheduled = false;
     // judge whether there are any compaction operations.

Reply via email to