This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new b6957f020d0 [HUDI-9205] Refactor Flink tests to avoid sleeping for 
data results (#13027)
b6957f020d0 is described below

commit b6957f020d00aca9d5c83f51b8e8f0bd102ac9bd
Author: Shuo Cheng <njucs...@gmail.com>
AuthorDate: Thu Mar 27 17:08:09 2025 +0800

    [HUDI-9205] Refactor Flink tests to avoid sleeping for data results (#13027)
---
 .../apache/hudi/sink/ITTestDataStreamWrite.java    |  56 ++-----
 .../apache/hudi/table/ITTestHoodieDataSource.java  | 172 +++++++++++----------
 .../apache/hudi/table/ITTestSchemaEvolution.java   |   4 -
 .../org/apache/hudi/utils/FlinkMiniCluster.java    |  11 ++
 .../org/apache/hudi/utils/TestConfigurations.java  |  15 +-
 .../utils/factory/CollectSinkTableFactory.java     |  62 +++++++-
 .../org/apache/hudi/storage/HoodieStorage.java     |   5 +-
 packaging/bundle-validation/flink/insert.sql       |   2 +
 packaging/bundle-validation/validate.sh            |   2 +-
 9 files changed, 191 insertions(+), 138 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
index 41912c4f84e..f0aa1a9e8c6 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
@@ -41,20 +41,15 @@ import org.apache.hudi.utils.TestData;
 import org.apache.hudi.utils.TestUtils;
 import org.apache.hudi.utils.source.ContinuousFileSource;
 
-import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.io.FilePathFilter;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.io.TextInputFormat;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.Schema;
 import org.apache.flink.table.catalog.CatalogBaseTable;
@@ -63,7 +58,6 @@ import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.util.TestLogger;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.api.io.TempDir;
@@ -167,6 +161,8 @@ public class ITTestDataStreamWrite extends TestLogger {
     conf.setString(FlinkOptions.INDEX_TYPE, indexType);
     conf.setInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, 4);
     conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
+    // use synchronized compaction to ensure flink job finishing with 
compaction completed.
+    conf.set(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
     conf.setString(FlinkOptions.TABLE_TYPE, 
HoodieTableType.MERGE_ON_READ.name());
 
     defaultWriteAndCheckExpected(conf, "mor_write_with_compact", 1);
@@ -186,11 +182,13 @@ public class ITTestDataStreamWrite extends TestLogger {
     writeWithClusterAndCheckExpected(conf, "cow_write_with_cluster", 1, 
EXPECTED);
   }
 
-  @Disabled("HUDI-9196")
   @ParameterizedTest
   @ValueSource(strings = {"COPY_ON_WRITE", "MERGE_ON_READ"})
   public void testStreamWriteWithIndexBootstrap(String tableType) throws 
Exception {
     Configuration conf = 
TestConfigurations.getDefaultConf(tempFile.toURI().toString());
+    // use synchronized compaction to avoid sleeping for async compact.
+    conf.set(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
+    conf.set(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
     conf.setString(FlinkOptions.TABLE_TYPE, tableType);
 
     writeAndCheckExpected(
@@ -267,26 +265,14 @@ public class ITTestDataStreamWrite extends TestLogger {
     boolean isMor = 
conf.getString(FlinkOptions.TABLE_TYPE).equals(HoodieTableType.MERGE_ON_READ.name());
 
     DataStream<RowData> dataStream;
-    if (isMor) {
-      TextInputFormat format = new TextInputFormat(new Path(sourcePath));
-      format.setFilesFilter(FilePathFilter.createDefaultFilter());
-      TypeInformation<String> typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
-      format.setCharsetName("UTF-8");
-
-      dataStream = execEnv
-          // use PROCESS_CONTINUOUSLY mode to trigger checkpoint
-          .readFile(format, sourcePath, 
FileProcessingMode.PROCESS_CONTINUOUSLY, 1000, typeInfo)
-          .map(JsonDeserializationFunction.getInstance(rowType))
-          .setParallelism(1);
-    } else {
-      dataStream = execEnv
-          // use continuous file source to trigger checkpoint
-          .addSource(new ContinuousFileSource.BoundedSourceFunction(new 
Path(sourcePath), checkpoints))
-          .name("continuous_file_source")
-          .setParallelism(1)
-          .map(JsonDeserializationFunction.getInstance(rowType))
-          .setParallelism(4);
-    }
+
+    dataStream = execEnv
+        // use continuous file source to trigger checkpoint
+        .addSource(new ContinuousFileSource.BoundedSourceFunction(new 
Path(sourcePath), checkpoints))
+        .name("continuous_file_source")
+        .setParallelism(1)
+        .map(JsonDeserializationFunction.getInstance(rowType))
+        .setParallelism(4);
 
     if (transformer.isPresent()) {
       dataStream = transformer.get().apply(dataStream);
@@ -345,20 +331,8 @@ public class ITTestDataStreamWrite extends TestLogger {
   }
 
   public void execute(StreamExecutionEnvironment execEnv, boolean isMor, 
String jobName) throws Exception {
-    if (isMor) {
-      JobClient client = execEnv.executeAsync(jobName);
-      if (client.getJobStatus().get() != JobStatus.FAILED) {
-        try {
-          TimeUnit.SECONDS.sleep(35); // wait long enough for the compaction 
to finish
-          client.cancel();
-        } catch (Throwable var1) {
-          // ignored
-        }
-      }
-    } else {
-      // wait for the streaming job to finish
-      execEnv.execute(jobName);
-    }
+    // wait for the streaming job to finish
+    execEnv.execute(jobName);
   }
 
   @Test
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index 891f3bd1206..997d266fd4e 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -47,13 +47,14 @@ import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableResult;
 import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.api.internal.TableEnvironmentImpl;
 import org.apache.flink.table.catalog.ObjectPath;
-import org.apache.flink.table.catalog.exceptions.TableNotExistException;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.ExceptionUtils;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
@@ -83,6 +84,7 @@ import java.util.stream.IntStream;
 import java.util.stream.Stream;
 
 import static org.apache.hudi.utils.TestConfigurations.catalog;
+import static org.apache.hudi.utils.TestConfigurations.getCollectSinkDDL;
 import static org.apache.hudi.utils.TestConfigurations.sql;
 import static org.apache.hudi.utils.TestData.array;
 import static org.apache.hudi.utils.TestData.assertRowsEquals;
@@ -147,19 +149,19 @@ public class ITTestHoodieDataSource {
         .option(FlinkOptions.READ_START_COMMIT, firstCommit)
         .end();
     streamTableEnv.executeSql(hoodieTableDDL);
-    List<Row> rows = execSelectSql(streamTableEnv, "select * from t1", 10);
+    List<Row> rows = execSelectSqlWithExpectedNum(streamTableEnv, "select * 
from t1", TestData.DATA_SET_SOURCE_INSERT.size());
     assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT);
 
     // insert another batch of data
     execInsertSql(streamTableEnv, insertInto);
-    List<Row> rows2 = execSelectSql(streamTableEnv, "select * from t1", 10);
+    List<Row> rows2 = execSelectSqlWithExpectedNum(streamTableEnv, "select * 
from t1", TestData.DATA_SET_SOURCE_INSERT.size());
     assertRowsEquals(rows2, TestData.DATA_SET_SOURCE_INSERT);
 
     streamTableEnv.getConfig().getConfiguration()
         .setBoolean("table.dynamic-table-options.enabled", true);
     // specify the start commit as earliest
-    List<Row> rows3 = execSelectSql(streamTableEnv,
-        "select * from t1/*+options('read.start-commit'='earliest')*/", 10);
+    List<Row> rows3 = execSelectSqlWithExpectedNum(streamTableEnv,
+        "select * from t1/*+options('read.start-commit'='earliest')*/", 
TestData.DATA_SET_SOURCE_INSERT.size());
     assertRowsEquals(rows3, TestData.DATA_SET_SOURCE_INSERT);
   }
 
@@ -184,18 +186,18 @@ public class ITTestHoodieDataSource {
     execInsertSql(streamTableEnv, insertInto);
 
     String firstCommit = 
TestUtils.getFirstCompleteInstant(tempFile.getAbsolutePath());
-    List<Row> rows = execSelectSql(streamTableEnv,
-        "select * from t1/*+options('read.start-commit'='" + firstCommit + 
"')*/", 10);
+    List<Row> rows = execSelectSqlWithExpectedNum(streamTableEnv,
+        "select * from t1/*+options('read.start-commit'='" + firstCommit + 
"')*/", TestData.DATA_SET_SOURCE_INSERT.size());
     assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT);
 
     // insert another batch of data
     execInsertSql(streamTableEnv, TestSQL.UPDATE_INSERT_T1);
-    List<Row> rows2 = execSelectSql(streamTableEnv, "select * from t1", 10);
+    List<Row> rows2 = execSelectSqlWithExpectedNum(streamTableEnv, "select * 
from t1", TestData.DATA_SET_SOURCE_CHANGELOG.size());
     assertRowsEquals(rows2, TestData.DATA_SET_SOURCE_CHANGELOG);
 
     // specify the start commit as earliest
-    List<Row> rows3 = execSelectSql(streamTableEnv,
-        "select * from t1/*+options('read.start-commit'='earliest')*/", 10);
+    List<Row> rows3 = execSelectSqlWithExpectedNum(streamTableEnv,
+        "select * from t1/*+options('read.start-commit'='earliest')*/", 
TestData.DATA_SET_SOURCE_MERGED.size());
     assertRowsEquals(rows3, TestData.DATA_SET_SOURCE_MERGED);
   }
 
@@ -218,12 +220,12 @@ public class ITTestHoodieDataSource {
     execInsertSql(streamTableEnv, insertInto);
 
     // reading from the latest commit instance.
-    List<Row> rows = execSelectSql(streamTableEnv, "select * from t1", 10);
+    List<Row> rows = execSelectSqlWithExpectedNum(streamTableEnv, "select * 
from t1", TestData.DATA_SET_SOURCE_INSERT_LATEST_COMMIT.size());
     assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT_LATEST_COMMIT);
 
     // insert another batch of data
     execInsertSql(streamTableEnv, insertInto);
-    List<Row> rows2 = execSelectSql(streamTableEnv, "select * from t1", 10);
+    List<Row> rows2 = execSelectSqlWithExpectedNum(streamTableEnv, "select * 
from t1", TestData.DATA_SET_SOURCE_INSERT_LATEST_COMMIT.size());
     assertRowsEquals(rows2, TestData.DATA_SET_SOURCE_INSERT_LATEST_COMMIT);
   }
 
@@ -259,7 +261,7 @@ public class ITTestHoodieDataSource {
         .option(FlinkOptions.READ_START_COMMIT, specifiedCommit)
         .end();
     streamTableEnv.executeSql(createHoodieTable2);
-    List<Row> rows = execSelectSql(streamTableEnv, "select * from t2", 10);
+    List<Row> rows = execSelectSqlWithExpectedNum(streamTableEnv, "select * 
from t2", TestData.DATA_SET_SOURCE_MERGED.size());
     // all the data with same keys are appended within one data bucket and one 
log file,
     // so when consume, the same keys are merged
     assertRowsEquals(rows, TestData.DATA_SET_SOURCE_MERGED);
@@ -360,7 +362,7 @@ public class ITTestHoodieDataSource {
     streamTableEnv.getConfig().getConfiguration()
         .setBoolean("table.dynamic-table-options.enabled", true);
     final String query = String.format("select * from t1/*+ 
options('read.start-commit'='%s')*/", instant);
-    List<Row> rows = execSelectSql(streamTableEnv, query, 10);
+    List<Row> rows = execSelectSqlWithExpectedNum(streamTableEnv, query, 
TestData.DATA_SET_SOURCE_INSERT_LATEST_COMMIT.size());
     assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT_LATEST_COMMIT);
   }
 
@@ -388,7 +390,7 @@ public class ITTestHoodieDataSource {
     streamTableEnv.getConfig().getConfiguration()
         .setBoolean("table.dynamic-table-options.enabled", true);
     final String query = String.format("select * from t1/*+ 
options('read.start-commit'='%s')*/", instant);
-    List<Row> rows = execSelectSql(streamTableEnv, query, 10);
+    List<Row> rows = execSelectSqlWithExpectedNum(streamTableEnv, query, 
TestData.DATA_SET_SOURCE_INSERT_LATEST_COMMIT.size());
     assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT_LATEST_COMMIT);
   }
 
@@ -416,7 +418,7 @@ public class ITTestHoodieDataSource {
     final String query = String.format("select * from t1/*+ 
options('read.start-commit'='%s')*/",
         FlinkOptions.START_COMMIT_EARLIEST);
 
-    List<Row> rows = execSelectSql(streamTableEnv, query, 10);
+    List<Row> rows = execSelectSql(streamTableEnv, query);
     // batch read will not lose data when cleaned clustered files.
     assertRowsEquals(rows, 
CollectionUtils.combine(TestData.DATA_SET_SOURCE_INSERT_FIRST_COMMIT,
         TestData.DATA_SET_SOURCE_INSERT_LATEST_COMMIT));
@@ -502,10 +504,10 @@ public class ITTestHoodieDataSource {
         + "  name varchar(20),\n"
         + "  age_sum int\n"
         + ") with (\n"
-        + "  'connector' = '" + CollectSinkTableFactory.FACTORY_ID + "'"
+        + "  'connector' = '" + CollectSinkTableFactory.FACTORY_ID + "',\n"
+        + "  'sink-expected-row-num' = '2'"
         + ")";
-    List<Row> result = execSelectSql(streamTableEnv,
-        "select name, sum(age) from t1 group by name", sinkDDL, 10);
+    List<Row> result = execSelectSqlWithExpectedNum(streamTableEnv, "select 
name, sum(age) from t1 group by name", sinkDDL);
     final String expected = "[+I(+I[Danny, 24]), +I(+I[Stephen, 34])]";
     assertRowsEquals(result, expected, true);
   }
@@ -556,8 +558,9 @@ public class ITTestHoodieDataSource {
                 + "+I[id8, Han, 56, 1970-01-01T00:00:00.008, par4]]",
             "[+I[id7, Bob, 44, 1970-01-01T00:00:00.007, par4], "
                 + "+I[id8, Han, 56, 1970-01-01T00:00:00.008, par4]]");
+    List<Integer> expectedNums = Arrays.asList(8, 3, 2);
     for (int i = 0; i < sqls.size(); i++) {
-      List<Row> result = execSelectSql(streamTableEnv, sqls.get(i), 10);
+      List<Row> result = execSelectSqlWithExpectedNum(streamTableEnv, 
sqls.get(i), expectedNums.get(i));
       assertRowsEquals(result, expectResults.get(i));
     }
   }
@@ -583,11 +586,11 @@ public class ITTestHoodieDataSource {
         .end();
     streamTableEnv.executeSql(hoodieTableDDL);
 
-    List<Row> result = execSelectSql(streamTableEnv,
-        "select * from t1 where `partition`='par1'", 10);
     final String expected = "["
         + "+I(+I[id1, Danny, 23, 1970-01-01T00:00:00.001, par1]), "
         + "+I(+I[id2, Stephen, 33, 1970-01-01T00:00:00.002, par1])]";
+    List<Row> result = execSelectSqlWithExpectedNum(streamTableEnv,
+        "select * from t1 where `partition`='par1'", 2);
     assertRowsEquals(result, expected, true);
   }
 
@@ -612,18 +615,8 @@ public class ITTestHoodieDataSource {
 
     String insertInto = "insert into t1 select * from source";
     execInsertSql(streamTableEnv, insertInto);
-
-    List<Row> result = execSelectSql(streamTableEnv, "select * from t1", 10);
-    final String expected = "["
-        + "+I[id1, Danny, 23, 1970-01-01T00:00:01, par1], "
-        + "+I[id2, Stephen, 33, 1970-01-01T00:00:02, par1], "
-        + "+I[id3, Julian, 53, 1970-01-01T00:00:03, par2], "
-        + "+I[id4, Fabian, 31, 1970-01-01T00:00:04, par2], "
-        + "+I[id5, Sophia, 18, 1970-01-01T00:00:05, par3], "
-        + "+I[id6, Emma, 20, 1970-01-01T00:00:06, par3], "
-        + "+I[id7, Bob, 44, 1970-01-01T00:00:07, par4], "
-        + "+I[id8, Han, 56, 1970-01-01T00:00:08, par4]]";
-    assertRowsEquals(result, expected);
+    List<Row> rows = execSelectSqlWithExpectedNum(streamTableEnv, "select * 
from t1", TestData.DATA_SET_SOURCE_INSERT.size());
+    assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT);
   }
 
   @ParameterizedTest
@@ -821,13 +814,12 @@ public class ITTestHoodieDataSource {
         + "+I[id7, Bob, 44, par4, 1970-01-01T00:00:07], "
         + "+I[id8, Han, 56, par4, 1970-01-01T00:00:08]]";
 
-    List<Row> result = execSelectSql(streamTableEnv, "select * from t1", 
execMode);
-
+    List<Row> result = execSelectSqlWithExpectedNum(streamTableEnv, "select * 
from t1", "t1", 8);
     assertRowsEquals(result, expected);
 
     // insert another batch of data
     execInsertSql(streamTableEnv, insertInto);
-    List<Row> result2 = execSelectSql(streamTableEnv, "select * from t1", 
execMode);
+    List<Row> result2 = execSelectSqlWithExpectedNum(streamTableEnv, "select * 
from t1", "t1", 8);
     assertRowsEquals(result2, expected);
   }
 
@@ -861,12 +853,12 @@ public class ITTestHoodieDataSource {
         + "+I[4, Fabian, 2021-12-04T15:16:04.400004], "
         + "+I[5, Tom, 2721-12-04T15:16:04.500005]]";
 
-    List<Row> result = execSelectSql(streamTableEnv, "select * from t1", 
execMode);
+    List<Row> result = execSelectSqlWithExpectedNum(streamTableEnv, "select * 
from t1", "t1", 5);
     assertRowsEquals(result, expected);
 
     // insert another batch of data
     execInsertSql(streamTableEnv, insertInto);
-    List<Row> result2 = execSelectSql(streamTableEnv, "select * from t1", 
execMode);
+    List<Row> result2 = execSelectSqlWithExpectedNum(streamTableEnv, "select * 
from t1", "t1", 5);
     assertRowsEquals(result2, expected);
   }
 
@@ -959,7 +951,7 @@ public class ITTestHoodieDataSource {
     execInsertSql(streamTableEnv, insertInto);
 
     // reading from the earliest commit instance.
-    List<Row> rows = execSelectSql(streamTableEnv, "select * from t1", 20);
+    List<Row> rows = execSelectSqlWithExpectedNum(streamTableEnv, "select * 
from t1", TestData.DATA_SET_SOURCE_INSERT.size());
     assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT);
   }
 
@@ -1162,9 +1154,9 @@ public class ITTestHoodieDataSource {
         .end();
     batchTableEnv.executeSql(createHoodieTable);
 
-    // no exception expects to be thrown
-    assertThrows(Exception.class,
-        () -> execSelectSql(batchTableEnv, "select * from t1", 10),
+    // ValidationException expects to be thrown
+    assertThrows(ValidationException.class,
+        () -> execSelectSql(batchTableEnv, "select * from t1"),
         "Exception should throw when querying non-exists table in batch mode");
 
     // case2: empty table without data files
@@ -1221,7 +1213,10 @@ public class ITTestHoodieDataSource {
         + "+I[109, 9000, spare tire, 22.200000762939453], "
         + "+I[110, 14000, jacket, 0.5]]";
 
-    List<Row> result = execSelectSql(streamTableEnv, "select * from 
hoodie_sink", execMode);
+    List<Row> result =
+        execMode == ExecMode.STREAM
+            ? execSelectSqlWithExpectedNum(streamTableEnv, "select * from 
hoodie_sink", "hoodie_sink", 10)
+            : execSelectSql(streamTableEnv, "select * from hoodie_sink");
 
     assertRowsEquals(result, expected);
   }
@@ -1554,7 +1549,6 @@ public class ITTestHoodieDataSource {
     assertRowsEquals(result2.subList(result2.size() - 2, result2.size()), 
"[-U[1], +U[2]]");
   }
 
-  @Disabled("HUDI-9196")
   @ParameterizedTest
   @EnumSource(value = HoodieTableType.class)
   void testIncrementalReadArchivedCommits(HoodieTableType tableType) throws 
Exception {
@@ -2189,7 +2183,7 @@ public class ITTestHoodieDataSource {
         .end();
     batchTableEnv.executeSql(readHoodieTableDDL);
 
-    List<Row> result = execSelectSql(batchTableEnv, "select * from t1", 
ExecMode.BATCH);
+    List<Row> result = execSelectSql(batchTableEnv, "select * from t1");
     assertRowsEquals(result, expected1.toString());
 
     batchTableEnv.executeSql("drop table t1");
@@ -2211,7 +2205,7 @@ public class ITTestHoodieDataSource {
         .end();
     batchTableEnv.executeSql(readHoodieTableDDL);
 
-    result = execSelectSql(batchTableEnv, "select * from t1", ExecMode.BATCH);
+    result = execSelectSql(batchTableEnv, "select * from t1");
     assertRowsEquals(result, expected2.toString());
 
     batchTableEnv.executeSql("drop table t1");
@@ -2233,12 +2227,11 @@ public class ITTestHoodieDataSource {
         .end();
     batchTableEnv.executeSql(readHoodieTableDDL);
 
-    result = execSelectSql(batchTableEnv, "select * from t1", ExecMode.BATCH);
+    result = execSelectSql(batchTableEnv, "select * from t1");
     assertRowsEquals(result, expected3.toString());
 
   }
 
-  @Disabled("HUDI-9196")
   @ParameterizedTest
   @MethodSource("tableTypeAndBooleanTrueFalseParams")
   void testDynamicPartitionPrune(HoodieTableType tableType, boolean 
hiveStylePartitioning) throws Exception {
@@ -2262,11 +2255,11 @@ public class ITTestHoodieDataSource {
     // launch a streaming query
     TableResult tableResult = submitSelectSql(streamTableEnv,
         "select uuid, name, age, ts, `partition` as part from t1 where 
`partition` > 'par4'",
-        TestConfigurations.getCollectSinkDDL("sink"));
+        TestConfigurations.getCollectSinkDDLWithExpectedNum("sink", 
TestData.DATA_SET_INSERT_SEPARATE_PARTITION.size()));
     // write second commit
     TestData.writeData(TestData.DATA_SET_INSERT_SEPARATE_PARTITION, conf);
     // stop the streaming query and get data
-    List<Row> actualResult = fetchResult(streamTableEnv, tableResult, 10);
+    List<Row> actualResult = fetchResultWithExpectedNum(streamTableEnv, 
tableResult);
     assertRowsEquals(actualResult, 
TestData.DATA_SET_INSERT_SEPARATE_PARTITION);
   }
 
@@ -2326,8 +2319,7 @@ public class ITTestHoodieDataSource {
 
   @ParameterizedTest
   @MethodSource("parametersForMetaColumnsSkip")
-  void testWriteWithoutMetaColumns(HoodieTableType tableType, 
WriteOperationType operation)
-      throws TableNotExistException, InterruptedException {
+  void testWriteWithoutMetaColumns(HoodieTableType tableType, 
WriteOperationType operation) throws Exception {
     String createSource = TestConfigurations.getFileSourceDDL("source");
     streamTableEnv.executeSql(createSource);
 
@@ -2346,7 +2338,7 @@ public class ITTestHoodieDataSource {
         .option(FlinkOptions.TABLE_TYPE, tableType)
         .end();
     streamTableEnv.executeSql(hoodieTableDDL);
-    List<Row> rows = execSelectSql(streamTableEnv, "select * from t1", 10);
+    List<Row> rows = execSelectSqlWithExpectedNum(streamTableEnv, "select * 
from t1", TestData.DATA_SET_SOURCE_INSERT.size());
     assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT);
   }
 
@@ -2402,7 +2394,7 @@ public class ITTestHoodieDataSource {
     execInsertSql(streamTableEnv, insertInto);
 
     // reading from the earliest
-    List<Row> rows = execSelectSql(streamTableEnv, "select * from t1", 10);
+    List<Row> rows = execSelectSqlWithExpectedNum(streamTableEnv, "select * 
from t1", TestData.DATA_SET_SOURCE_INSERT.size());
     assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT);
   }
 
@@ -2527,44 +2519,44 @@ public class ITTestHoodieDataSource {
     }
   }
 
-  private List<Row> execSelectSql(TableEnvironment tEnv, String select, 
ExecMode execMode)
-      throws TableNotExistException, InterruptedException {
-    final String[] splits = select.split(" ");
-    final String tableName = splits[splits.length - 1];
-    switch (execMode) {
-      case STREAM:
-        return execSelectSql(tEnv, select, 10, tableName);
-      case BATCH:
-        return CollectionUtil.iterableToList(
-            () -> tEnv.sqlQuery("select * from " + 
tableName).execute().collect());
-      default:
-        throw new AssertionError();
-    }
+  /**
+   * Use TableResult#collect() to collect results directly for bounded source.
+   */
+  private List<Row> execSelectSql(TableEnvironment tEnv, String select) {
+    return CollectionUtil.iterableToList(
+        () -> tEnv.sqlQuery(select).execute().collect());
   }
 
-  private List<Row> execSelectSql(TableEnvironment tEnv, String select, long 
timeout)
-      throws InterruptedException, TableNotExistException {
-    return execSelectSql(tEnv, select, timeout, null);
+  /**
+   * Use CollectTableSink to collect results with expected row number.
+   */
+  private List<Row> execSelectSqlWithExpectedNum(TableEnvironment tEnv, String 
select, int expectedNum) throws Exception {
+    return execSelectSqlWithExpectedNum(tEnv, select, null, expectedNum);
   }
 
-  private List<Row> execSelectSql(TableEnvironment tEnv, String select, long 
timeout, String sourceTable)
-      throws InterruptedException, TableNotExistException {
+  /**
+   * Use CollectTableSink to collect results with expected row number.
+   */
+  private List<Row> execSelectSqlWithExpectedNum(TableEnvironment tEnv, String 
select, String sourceTable, int expectedNum)
+      throws Exception {
     final String sinkDDL;
     if (sourceTable != null) {
-      // use the source table schema as the sink schema if the source table 
was specified, .
+      // use the source table schema as the sink schema if the source table 
was specified.
       ObjectPath objectPath = new ObjectPath(tEnv.getCurrentDatabase(), 
sourceTable);
       TableSchema schema = 
tEnv.getCatalog(tEnv.getCurrentCatalog()).get().getTable(objectPath).getSchema();
-      sinkDDL = TestConfigurations.getCollectSinkDDL("sink", schema);
+      sinkDDL = TestConfigurations.getCollectSinkDDLWithExpectedNum("sink", 
schema, expectedNum);
     } else {
-      sinkDDL = TestConfigurations.getCollectSinkDDL("sink");
+      sinkDDL = TestConfigurations.getCollectSinkDDLWithExpectedNum("sink", 
expectedNum);
     }
-    return execSelectSql(tEnv, select, sinkDDL, timeout);
+    return execSelectSqlWithExpectedNum(tEnv, select, sinkDDL);
   }
 
-  private List<Row> execSelectSql(TableEnvironment tEnv, String select, String 
sinkDDL, long timeout)
-      throws InterruptedException {
+  /**
+   * Use CollectTableSink to collect results with expected row number.
+   */
+  private List<Row> execSelectSqlWithExpectedNum(TableEnvironment tEnv, String 
select, String sinkDDL) {
     TableResult tableResult = submitSelectSql(tEnv, select, sinkDDL);
-    return fetchResult(tEnv, tableResult, timeout);
+    return fetchResultWithExpectedNum(tEnv, tableResult);
   }
 
   private TableResult submitSelectSql(TableEnvironment tEnv, String select, 
String sinkDDL) {
@@ -2574,14 +2566,28 @@ public class ITTestHoodieDataSource {
     return tableResult;
   }
 
-  private List<Row> fetchResult(TableEnvironment tEnv, TableResult 
tableResult, long timeout)
-      throws InterruptedException {
-    // wait for the timeout then cancels the job
+  private List<Row> execSelectSql(TableEnvironment tEnv, String select, long 
timeout) throws InterruptedException {
+    TableResult tableResult = submitSelectSql(tEnv, select, 
getCollectSinkDDL("sink"));
     TimeUnit.SECONDS.sleep(timeout);
+    // wait for the timeout then cancels the job
     tableResult.getJobClient().ifPresent(JobClient::cancel);
     tEnv.executeSql("DROP TABLE IF EXISTS sink");
     return CollectSinkTableFactory.RESULT.values().stream()
         .flatMap(Collection::stream)
         .collect(Collectors.toList());
   }
+
+  private List<Row> fetchResultWithExpectedNum(TableEnvironment tEnv, 
TableResult tableResult) {
+    try {
+      // wait the continuous streaming query to be terminated by forced 
exception with expected row number
+      // and max waiting timeout is 30s
+      tableResult.await(30, TimeUnit.SECONDS);
+    } catch (Throwable e) {
+      ExceptionUtils.assertThrowable(e, 
CollectSinkTableFactory.SuccessException.class);
+    }
+    tEnv.executeSql("DROP TABLE IF EXISTS sink");
+    return CollectSinkTableFactory.RESULT.values().stream()
+        .flatMap(Collection::stream)
+        .collect(Collectors.toList());
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolution.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolution.java
index c0dced5a613..5026af91a96 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolution.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolution.java
@@ -36,7 +36,6 @@ import org.apache.hudi.utils.FlinkMiniCluster;
 import org.apache.avro.Schema;
 import org.apache.avro.SchemaBuilder;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.TableResult;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
@@ -81,9 +80,6 @@ public class ITTestSchemaEvolution {
   public void setUp() {
     StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
     tEnv = StreamTableEnvironment.create(env);
-    // flink job uses child-first classloader by default, async services fired 
by flink job are not guaranteed
-    // to be killed right away, which then may trigger classloader leak 
checking exception
-    
tEnv.getConfig().getConfiguration().set(CoreOptions.CHECK_LEAKED_CLASSLOADER, 
false);
   }
 
   @Test
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/FlinkMiniCluster.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/FlinkMiniCluster.java
index bfddb43578c..da4442890e5 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/FlinkMiniCluster.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/FlinkMiniCluster.java
@@ -18,6 +18,8 @@
 
 package org.apache.hudi.utils;
 
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.test.util.AbstractTestBase;
@@ -41,6 +43,7 @@ public class FlinkMiniCluster implements BeforeAllCallback, 
AfterAllCallback, Af
   private static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
       new MiniClusterWithClientResource(
           new MiniClusterResourceConfiguration.Builder()
+              .setConfiguration(getDefaultConfig())
               .setNumberTaskManagers(1)
               .setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM)
               .build());
@@ -60,6 +63,14 @@ public class FlinkMiniCluster implements BeforeAllCallback, 
AfterAllCallback, Af
     cleanupRunningJobs();
   }
 
+  private static Configuration getDefaultConfig() {
+    Configuration config = new Configuration();
+    // flink job uses child-first classloader by default, async services fired 
by flink job are not
+    // guaranteed to be killed right away, which then may trigger classloader 
leak checking exception.
+    config.set(CoreOptions.CHECK_LEAKED_CLASSLOADER, false);
+    return config;
+  }
+
   private void cleanupRunningJobs() throws Exception {
     if (!MINI_CLUSTER_RESOURCE.getMiniCluster().isRunning()) {
       // do nothing if the MiniCluster is not running
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
index 92d44a8e7ef..ed5aef764ab 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
@@ -244,6 +244,11 @@ public class TestConfigurations {
   }
 
   public static String getCollectSinkDDL(String tableName) {
+    // set expectedRowNum as -1 to disable forced exception to terminate a 
successful sink
+    return getCollectSinkDDLWithExpectedNum(tableName, -1);
+  }
+
+  public static String getCollectSinkDDLWithExpectedNum(String tableName, int 
expectedRowNum) {
     return "create table " + tableName + "(\n"
         + "  uuid varchar(20),\n"
         + "  name varchar(10),\n"
@@ -251,11 +256,16 @@ public class TestConfigurations {
         + "  ts timestamp(3),\n"
         + "  `partition` varchar(20)\n"
         + ") with (\n"
-        + "  'connector' = '" + CollectSinkTableFactory.FACTORY_ID + "'"
+        + "  'connector' = '" + CollectSinkTableFactory.FACTORY_ID + "',\n"
+        + "  'sink-expected-row-num' = '" + expectedRowNum + "'"
         + ")";
   }
 
   public static String getCollectSinkDDL(String tableName, TableSchema 
tableSchema) {
+    return getCollectSinkDDLWithExpectedNum(tableName, tableSchema, -1);
+  }
+
+  public static String getCollectSinkDDLWithExpectedNum(String tableName, 
TableSchema tableSchema, int expectRowNum) {
     final StringBuilder builder = new StringBuilder("create table " + 
tableName + "(\n");
     String[] fieldNames = tableSchema.getFieldNames();
     DataType[] fieldTypes = tableSchema.getFieldDataTypes();
@@ -271,7 +281,8 @@ public class TestConfigurations {
     }
     final String withProps = ""
         + ") with (\n"
-        + "  'connector' = '" + CollectSinkTableFactory.FACTORY_ID + "'\n"
+        + "  'connector' = '" + CollectSinkTableFactory.FACTORY_ID + "',\n"
+        + "  'sink-expected-row-num' = '" + expectRowNum + "'"
         + ")";
     builder.append(withProps);
     return builder.toString();
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/factory/CollectSinkTableFactory.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/factory/CollectSinkTableFactory.java
index da0761a7542..ebce7c4ddbd 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/factory/CollectSinkTableFactory.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/factory/CollectSinkTableFactory.java
@@ -18,12 +18,14 @@
 
 package org.apache.hudi.utils.factory;
 
+import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.util.ChangelogModes;
 
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
@@ -43,6 +45,7 @@ import org.apache.flink.types.Row;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -59,14 +62,19 @@ public class CollectSinkTableFactory implements 
DynamicTableSinkFactory {
   // global results to collect and query
   public static final Map<Integer, List<Row>> RESULT = new HashMap<>();
 
+  // options
+  private static final ConfigOption<Integer> SINK_EXPECTED_ROW_NUM =
+      ConfigOptions.key("sink-expected-row-num").intType().defaultValue(-1);
+
   @Override
   public DynamicTableSink createDynamicTableSink(Context context) {
     FactoryUtil.TableFactoryHelper helper = 
FactoryUtil.createTableFactoryHelper(this, context);
     helper.validate();
 
     TableSchema schema = context.getCatalogTable().getSchema();
+    int expectRowNum = helper.getOptions().get(SINK_EXPECTED_ROW_NUM);
     RESULT.clear();
-    return new CollectTableSink(schema, 
context.getObjectIdentifier().getObjectName());
+    return new CollectTableSink(schema, 
context.getObjectIdentifier().getObjectName(), expectRowNum);
   }
 
   @Override
@@ -81,7 +89,7 @@ public class CollectSinkTableFactory implements 
DynamicTableSinkFactory {
 
   @Override
   public Set<ConfigOption<?>> optionalOptions() {
-    return Collections.emptySet();
+    return new HashSet<>(Collections.singletonList(SINK_EXPECTED_ROW_NUM));
   }
 
   // 
--------------------------------------------------------------------------------------------
@@ -95,12 +103,15 @@ public class CollectSinkTableFactory implements 
DynamicTableSinkFactory {
 
     private final TableSchema schema;
     private final String tableName;
+    private final int expectedRowNum;
 
     private CollectTableSink(
         TableSchema schema,
-        String tableName) {
+        String tableName,
+        int expectedRowNum) {
       this.schema = schema;
       this.tableName = tableName;
+      this.expectedRowNum = expectedRowNum;
     }
 
     @Override
@@ -113,12 +124,16 @@ public class CollectSinkTableFactory implements 
DynamicTableSinkFactory {
       final DataType rowType = schema.toPhysicalRowDataType();
       final RowTypeInfo rowTypeInfo = (RowTypeInfo) 
TypeConversions.fromDataTypeToLegacyInfo(rowType);
       DataStructureConverter converter = 
context.createDataStructureConverter(schema.toPhysicalRowDataType());
-      return SinkFunctionProvider.of(new CollectSinkFunction(converter, 
rowTypeInfo));
+      if (expectedRowNum != -1) {
+        return SinkFunctionProvider.of(new 
CollectSinkFunctionWithExpectedNum(converter, rowTypeInfo, expectedRowNum));
+      } else {
+        return SinkFunctionProvider.of(new CollectSinkFunction(converter, 
rowTypeInfo));
+      }
     }
 
     @Override
     public DynamicTableSink copy() {
-      return new CollectTableSink(schema, tableName);
+      return new CollectTableSink(schema, tableName, expectedRowNum);
     }
 
     @Override
@@ -170,7 +185,42 @@ public class CollectSinkTableFactory implements 
DynamicTableSinkFactory {
     @Override
     public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
       resultState.clear();
-      resultState.addAll(RESULT.get(taskID));
+      List<Row> rows = RESULT.get(taskID);
+      if (rows != null) {
+        resultState.addAll(RESULT.get(taskID));
+      }
+    }
+  }
+
+  static class CollectSinkFunctionWithExpectedNum extends CollectSinkFunction {
+    private final int expectRowNum;
+
+    protected CollectSinkFunctionWithExpectedNum(
+        DynamicTableSink.DataStructureConverter converter,
+        RowTypeInfo rowTypeInfo,
+        int expectRowNum) {
+      super(converter, rowTypeInfo);
+      ValidationUtils.checkArgument(expectRowNum > 0, "Expected row number 
should be positive.");
+      this.expectRowNum = expectRowNum;
+    }
+
+    @Override
+    public void invoke(RowData value, Context context) {
+      super.invoke(value, context);
+      if (RESULT.values().stream().mapToInt(List::size).sum() >= expectRowNum) 
{
+        throw new SuccessException();
+      }
+    }
+  }
+
+  /**
+   * Exception that is thrown to terminate a program and indicate success.
+   */
+  public static class SuccessException extends RuntimeException {
+    private static final long serialVersionUID = 1L;
+
+    public SuccessException() {
+      super("Forced exception to terminate a successful sink.");
     }
   }
 }
diff --git a/hudi-io/src/main/java/org/apache/hudi/storage/HoodieStorage.java 
b/hudi-io/src/main/java/org/apache/hudi/storage/HoodieStorage.java
index d11bd7fc6e9..adcbf6a630f 100644
--- a/hudi-io/src/main/java/org/apache/hudi/storage/HoodieStorage.java
+++ b/hudi-io/src/main/java/org/apache/hudi/storage/HoodieStorage.java
@@ -389,7 +389,10 @@ public abstract class HoodieStorage implements Closeable {
    */
   @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
   public final boolean needCreateTempFile() {
-    return StorageSchemes.HDFS.getScheme().equals(getScheme());
+    return StorageSchemes.HDFS.getScheme().equals(getScheme())
+        // Local file will be visible immediately after 
LocalFileSystem#create(..), even before the output
+        // stream is closed, so temporary file is also needed for atomic file 
creating with content written.
+        || StorageSchemes.FILE.getScheme().equals(getScheme());
   }
 
   /**
diff --git a/packaging/bundle-validation/flink/insert.sql 
b/packaging/bundle-validation/flink/insert.sql
index 624aab5d357..b53b3f55263 100644
--- a/packaging/bundle-validation/flink/insert.sql
+++ b/packaging/bundle-validation/flink/insert.sql
@@ -17,6 +17,8 @@
  * under the License.
  */
 
+SET 'table.dml-sync' = 'true';
+
 CREATE TABLE t1
 (
     uuid        VARCHAR(20) PRIMARY KEY NOT ENFORCED,
diff --git a/packaging/bundle-validation/validate.sh 
b/packaging/bundle-validation/validate.sh
index ebaa590e9d1..3dfa53b67f5 100755
--- a/packaging/bundle-validation/validate.sh
+++ b/packaging/bundle-validation/validate.sh
@@ -182,7 +182,7 @@ test_flink_bundle() {
     change_java_runtime_version
     $FLINK_HOME/bin/start-cluster.sh
     $FLINK_HOME/bin/sql-client.sh -j $JARS_DIR/flink.jar -f 
$WORKDIR/flink/insert.sql
-    sleep 10 # for test stability
+    echo "validate flink insert finished."
     $WORKDIR/flink/compact.sh $JARS_DIR/flink.jar
     local EXIT_CODE=$?
     $FLINK_HOME/bin/stop-cluster.sh


Reply via email to