This is an automated email from the ASF dual-hosted git repository. codope pushed a commit to branch release-0.12.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 0b244193761d446e26670c64b68f6be61b5d64d3 Author: HunterXHunter <[email protected]> AuthorDate: Thu Aug 4 20:28:28 2022 +0800 [HUDI-4385] Support online compaction in the flink batch mode write (#6093) * [HUDI-4385] Support online compaction in the flink batch mode write Signed-off-by: HunterXHunter <[email protected]> --- .../hudi/sink/StreamWriteOperatorCoordinator.java | 5 +++++ .../hudi/sink/compact/CompactionPlanOperator.java | 9 ++++++++- .../java/org/apache/hudi/table/HoodieTableSink.java | 4 ++++ .../hudi/sink/compact/ITTestHoodieFlinkCompactor.java | 18 ++++++++++++++++++ 4 files changed, 35 insertions(+), 1 deletion(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java index 7d20789bff..e3b0d82704 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java @@ -424,6 +424,11 @@ public class StreamWriteOperatorCoordinator Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); // sync Hive synchronously if it is enabled in batch mode. syncHive(); + // schedules the compaction plan in batch execution mode + if (tableState.scheduleCompaction) { + // if async compaction is on, schedule the compaction + CompactionUtil.scheduleCompaction(metaClient, writeClient, tableState.isDeltaTimeCompaction, true); + } } } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java index d5e718883b..efaf4bfdc7 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java @@ -18,6 +18,7 @@ package org.apache.hudi.sink.compact; +import org.apache.flink.streaming.api.operators.BoundedOneInput; import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.common.model.CompactionOperation; import org.apache.hudi.common.table.timeline.HoodieInstant; @@ -47,7 +48,7 @@ import static java.util.stream.Collectors.toList; * <p>It should be singleton to avoid conflicts. */ public class CompactionPlanOperator extends AbstractStreamOperator<CompactionPlanEvent> - implements OneInputStreamOperator<Object, CompactionPlanEvent> { + implements OneInputStreamOperator<Object, CompactionPlanEvent>, BoundedOneInput { /** * Config options. @@ -141,4 +142,10 @@ public class CompactionPlanOperator extends AbstractStreamOperator<CompactionPla public void setOutput(Output<StreamRecord<CompactionPlanEvent>> output) { this.output = output; } + + @Override + public void endInput() throws Exception { + // Called when the input data ends, only used in batch mode. + notifyCheckpointComplete(-1); + } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java index d579027911..5af86867d8 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java @@ -95,6 +95,10 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning, pipeline = Pipelines.hoodieStreamWrite(conf, parallelism, hoodieRecordDataStream); // compaction if (OptionsResolver.needsAsyncCompaction(conf)) { + // use synchronous compaction for bounded source. + if (context.isBounded()) { + conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false); + } return Pipelines.compact(conf, pipeline); } else { return Pipelines.clean(conf, pipeline); diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java index d8ffd6b1b1..a8b78ab64d 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java @@ -42,6 +42,7 @@ import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.config.ExecutionConfigOptions; import org.apache.flink.table.api.internal.TableEnvironmentImpl; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -288,6 +289,23 @@ public class ITTestHoodieFlinkCompactor { TestData.checkWrittenDataCOW(tempFile, EXPECTED3); } + @Test + public void testCompactionInBatchExecutionMode() throws Exception { + EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build(); + TableEnvironment tableEnv = TableEnvironmentImpl.create(settings); + tableEnv.getConfig().getConfiguration() + .setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1); + Map<String, String> options = new HashMap<>(); + options.put(FlinkOptions.COMPACTION_DELTA_COMMITS.key(), "2"); + options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); + options.put(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ"); + String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options); + tableEnv.executeSql(hoodieTableDDL); + tableEnv.executeSql(TestSQL.INSERT_T1).await(); + tableEnv.executeSql(TestSQL.UPDATE_INSERT_T1).await(); + TestData.checkWrittenDataCOW(tempFile, EXPECTED2); + } + private String scheduleCompactionPlan(HoodieTableMetaClient metaClient, HoodieFlinkWriteClient<?> writeClient) { boolean scheduled = false; // judge whether there are any compaction operations.
