This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 8c6e8d92cad [HUDI-6592] Flink insert overwrite should support dynamic 
partition and whole table (#9287)
8c6e8d92cad is described below

commit 8c6e8d92cad465925d5bf165deccc18229efcd1f
Author: Nicholas Jiang <[email protected]>
AuthorDate: Sun Aug 6 08:15:19 2023 +0800

    [HUDI-6592] Flink insert overwrite should support dynamic partition and 
whole table (#9287)
---
 .../apache/hudi/configuration/FlinkOptions.java    | 11 +++++++
 .../apache/hudi/configuration/OptionsResolver.java |  9 ++++++
 .../sink/overwrite/PartitionOverwriteMode.java     | 35 ++++++++++++++++++++++
 .../org/apache/hudi/table/HoodieTableSink.java     | 11 ++++---
 .../apache/hudi/table/ITTestHoodieDataSource.java  | 27 +++++++++++++----
 .../test/java/org/apache/hudi/utils/TestData.java  | 16 ++++++++++
 6 files changed, 99 insertions(+), 10 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index c140d40af88..556d0b2ef2b 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -39,6 +39,7 @@ import org.apache.hudi.hive.ddl.HiveSyncMode;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
 import org.apache.hudi.keygen.constant.KeyGeneratorType;
+import org.apache.hudi.sink.overwrite.PartitionOverwriteMode;
 import org.apache.hudi.table.action.cluster.ClusteringPlanPartitionFilterMode;
 import org.apache.hudi.util.ClientIds;
 
@@ -613,6 +614,16 @@ public class FlinkOptions extends HoodieConfig {
       .defaultValue(128)
       .withDescription("Sort memory in MB, default 128MB");
 
+  @AdvancedConfig
+  public static final ConfigOption<String> WRITE_PARTITION_OVERWRITE_MODE = 
ConfigOptions
+      .key("write.partition.overwrite.mode")
+      .stringType()
+      .defaultValue(PartitionOverwriteMode.STATIC.name())
+      .withDescription("When INSERT OVERWRITE a partitioned data source table, 
we currently support 2 modes: static and dynamic. "
+          + "Static mode deletes all the partitions that match the partition 
specification(e.g. PARTITION(a=1,b)) in the INSERT statement, before 
overwriting. "
+          + "Dynamic mode doesn't delete partitions ahead, and only overwrite 
those partitions that have data written into it at runtime. "
+          + "By default we use static mode to keep the same behavior of 
previous version.");
+
   // this is only for internal use
   @AdvancedConfig
   public static final ConfigOption<String> WRITE_CLIENT_ID = ConfigOptions
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
index 944e795dc2f..bfde0b0e2b0 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
@@ -35,6 +35,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+import org.apache.hudi.sink.overwrite.PartitionOverwriteMode;
 import org.apache.hudi.table.format.FilePathUtils;
 
 import org.apache.flink.configuration.ConfigOption;
@@ -241,6 +242,14 @@ public class OptionsResolver {
         || 
conf.getString(FlinkOptions.OPERATION).equalsIgnoreCase(WriteOperationType.INSERT_OVERWRITE.value());
   }
 
+  /**
+   * Returns whether the operation is INSERT OVERWRITE dynamic partition.
+   */
+  public static boolean overwriteDynamicPartition(Configuration conf) {
+    return 
conf.getString(FlinkOptions.OPERATION).equalsIgnoreCase(WriteOperationType.INSERT_OVERWRITE.value())
+        || 
conf.getString(FlinkOptions.WRITE_PARTITION_OVERWRITE_MODE).equalsIgnoreCase(PartitionOverwriteMode.DYNAMIC.name());
+  }
+
   /**
    * Returns whether the read start commit is specific commit timestamp.
    */
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/overwrite/PartitionOverwriteMode.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/overwrite/PartitionOverwriteMode.java
new file mode 100644
index 00000000000..700c7432e25
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/overwrite/PartitionOverwriteMode.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.overwrite;
+
+import org.apache.hudi.common.config.EnumDescription;
+import org.apache.hudi.common.config.EnumFieldDescription;
+
+/**
+ * Mode of INSERT OVERWRITE partitioned table.
+ */
+@EnumDescription("Mode of INSERT OVERWRITE a partitioned data source table.")
+public enum PartitionOverwriteMode {
+
+  @EnumFieldDescription("Deletes all the partitions that match the partition 
specification(e.g. PARTITION(a=1,b)) in the INSERT statement, before 
overwriting.")
+  STATIC,
+
+  @EnumFieldDescription("Doesn't delete partitions ahead, and only overwrite 
those partitions that have data written into it at runtime.")
+  DYNAMIC
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
index ec0db6b1262..e80e2510a65 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
@@ -148,7 +148,7 @@ public class HoodieTableSink implements
   @Override
   public void applyStaticPartition(Map<String, String> partitions) {
     // #applyOverwrite should have been invoked.
-    if (this.overwrite && partitions.size() > 0) {
+    if (this.overwrite && !partitions.isEmpty()) {
       this.conf.setString(FlinkOptions.OPERATION, 
WriteOperationType.INSERT_OVERWRITE.value());
     }
   }
@@ -156,9 +156,12 @@ public class HoodieTableSink implements
   @Override
   public void applyOverwrite(boolean overwrite) {
     this.overwrite = overwrite;
-    // set up the operation as INSERT_OVERWRITE_TABLE first,
-    // if there are explicit partitions, #applyStaticPartition would overwrite 
the option.
-    this.conf.setString(FlinkOptions.OPERATION, 
WriteOperationType.INSERT_OVERWRITE_TABLE.value());
+    if (OptionsResolver.overwriteDynamicPartition(conf)) {
+      this.conf.setString(FlinkOptions.OPERATION, 
WriteOperationType.INSERT_OVERWRITE.value());
+    } else {
+      // if there are explicit partitions, #applyStaticPartition would 
overwrite the option.
+      this.conf.setString(FlinkOptions.OPERATION, 
WriteOperationType.INSERT_OVERWRITE_TABLE.value());
+    }
   }
 
   @Override
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index 5fbf44062a0..4ea92fbb845 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -777,8 +777,8 @@ public class ITTestHoodieDataSource {
         () -> tableEnv.sqlQuery("select * from t1").execute().collect());
     assertRowsEquals(result2, TestData.DATA_SET_SOURCE_INSERT_OVERWRITE);
 
-    // overwrite the whole table
-    final String insertInto3 = "insert overwrite t1 values\n"
+    // overwrite the dynamic partition
+    final String insertInto3 = "insert overwrite t1 /*+ 
OPTIONS('write.partition.overwrite.mode'='dynamic') */ values\n"
         + "('id1','Danny',24,TIMESTAMP '1970-01-01 00:00:01', 'par1'),\n"
         + "('id2','Stephen',34,TIMESTAMP '1970-01-01 00:00:02', 'par2')\n";
 
@@ -786,16 +786,31 @@ public class ITTestHoodieDataSource {
 
     List<Row> result3 = CollectionUtil.iterableToList(
         () -> tableEnv.sqlQuery("select * from t1").execute().collect());
-    final String expected = "["
-        + "+I[id1, Danny, 24, 1970-01-01T00:00:01, par1], "
-        + "+I[id2, Stephen, 34, 1970-01-01T00:00:02, par2]]";
-    assertRowsEquals(result3, expected);
+    assertRowsEquals(result3, 
TestData.DATA_SET_SOURCE_INSERT_OVERWRITE_DYNAMIC_PARTITION);
 
     // execute the same statement again and check the result
     execInsertSql(tableEnv, insertInto3);
+    assertRowsEquals(result3, 
TestData.DATA_SET_SOURCE_INSERT_OVERWRITE_DYNAMIC_PARTITION);
+
+    // overwrite the whole table
+    final String insertInto4 = "insert overwrite t1 values\n"
+        + "('id1','Danny',24,TIMESTAMP '1970-01-01 00:00:01', 'par1'),\n"
+        + "('id2','Stephen',34,TIMESTAMP '1970-01-01 00:00:02', 'par2')\n";
+
+    execInsertSql(tableEnv, insertInto4);
+
     List<Row> result4 = CollectionUtil.iterableToList(
         () -> tableEnv.sqlQuery("select * from t1").execute().collect());
+    final String expected = "["
+        + "+I[id1, Danny, 24, 1970-01-01T00:00:01, par1], "
+        + "+I[id2, Stephen, 34, 1970-01-01T00:00:02, par2]]";
     assertRowsEquals(result4, expected);
+
+    // execute the same statement again and check the result
+    execInsertSql(tableEnv, insertInto4);
+    List<Row> result5 = CollectionUtil.iterableToList(
+        () -> tableEnv.sqlQuery("select * from t1").execute().collect());
+    assertRowsEquals(result5, expected);
   }
 
   @ParameterizedTest
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
index db9cd65b9f1..65c8e82ada1 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
@@ -287,6 +287,22 @@ public class TestData {
           TimestampData.fromEpochMillis(8000), StringData.fromString("par4"))
   );
 
+  // data set of test_source.data with partition 'par1' and 'par2' overwrite
+  public static List<RowData> 
DATA_SET_SOURCE_INSERT_OVERWRITE_DYNAMIC_PARTITION = Arrays.asList(
+      insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 
24,
+          TimestampData.fromEpochMillis(1000), StringData.fromString("par1")),
+      insertRow(StringData.fromString("id2"), 
StringData.fromString("Stephen"), 34,
+          TimestampData.fromEpochMillis(2000), StringData.fromString("par2")),
+      insertRow(StringData.fromString("id5"), StringData.fromString("Sophia"), 
18,
+          TimestampData.fromEpochMillis(5000), StringData.fromString("par3")),
+      insertRow(StringData.fromString("id6"), StringData.fromString("Emma"), 
20,
+          TimestampData.fromEpochMillis(6000), StringData.fromString("par3")),
+      insertRow(StringData.fromString("id7"), StringData.fromString("Bob"), 44,
+          TimestampData.fromEpochMillis(7000), StringData.fromString("par4")),
+      insertRow(StringData.fromString("id8"), StringData.fromString("Han"), 56,
+          TimestampData.fromEpochMillis(8000), StringData.fromString("par4"))
+  );
+
   public static List<RowData> DATA_SET_UPDATE_DELETE = Arrays.asList(
       // this is update
       insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 
24,

Reply via email to