This is an automated email from the ASF dual-hosted git repository. danny0405 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new b6957f020d0 [HUDI-9205] Refactor Flink tests to avoid sleeping for data results (#13027) b6957f020d0 is described below commit b6957f020d00aca9d5c83f51b8e8f0bd102ac9bd Author: Shuo Cheng <njucs...@gmail.com> AuthorDate: Thu Mar 27 17:08:09 2025 +0800 [HUDI-9205] Refactor Flink tests to avoid sleeping for data results (#13027) --- .../apache/hudi/sink/ITTestDataStreamWrite.java | 56 ++----- .../apache/hudi/table/ITTestHoodieDataSource.java | 172 +++++++++++---------- .../apache/hudi/table/ITTestSchemaEvolution.java | 4 - .../org/apache/hudi/utils/FlinkMiniCluster.java | 11 ++ .../org/apache/hudi/utils/TestConfigurations.java | 15 +- .../utils/factory/CollectSinkTableFactory.java | 62 +++++++- .../org/apache/hudi/storage/HoodieStorage.java | 5 +- packaging/bundle-validation/flink/insert.sql | 2 + packaging/bundle-validation/validate.sh | 2 +- 9 files changed, 191 insertions(+), 138 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java index 41912c4f84e..f0aa1a9e8c6 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java @@ -41,20 +41,15 @@ import org.apache.hudi.utils.TestData; import org.apache.hudi.utils.TestUtils; import org.apache.hudi.utils.source.ContinuousFileSource; -import org.apache.flink.api.common.JobStatus; import org.apache.flink.api.common.io.FilePathFilter; import org.apache.flink.api.common.restartstrategy.RestartStrategies; -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.io.TextInputFormat; import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.execution.JobClient; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.source.FileProcessingMode; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.Schema; import org.apache.flink.table.catalog.CatalogBaseTable; @@ -63,7 +58,6 @@ import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.util.TestLogger; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.io.TempDir; @@ -167,6 +161,8 @@ public class ITTestDataStreamWrite extends TestLogger { conf.setString(FlinkOptions.INDEX_TYPE, indexType); conf.setInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, 4); conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1); + // use synchronized compaction to ensure flink job finishing with compaction completed. + conf.set(FlinkOptions.COMPACTION_ASYNC_ENABLED, false); conf.setString(FlinkOptions.TABLE_TYPE, HoodieTableType.MERGE_ON_READ.name()); defaultWriteAndCheckExpected(conf, "mor_write_with_compact", 1); @@ -186,11 +182,13 @@ public class ITTestDataStreamWrite extends TestLogger { writeWithClusterAndCheckExpected(conf, "cow_write_with_cluster", 1, EXPECTED); } - @Disabled("HUDI-9196") @ParameterizedTest @ValueSource(strings = {"COPY_ON_WRITE", "MERGE_ON_READ"}) public void testStreamWriteWithIndexBootstrap(String tableType) throws Exception { Configuration conf = TestConfigurations.getDefaultConf(tempFile.toURI().toString()); + // use synchronized compaction to avoid sleeping for async compact. + conf.set(FlinkOptions.COMPACTION_ASYNC_ENABLED, false); + conf.set(FlinkOptions.COMPACTION_DELTA_COMMITS, 1); conf.setString(FlinkOptions.TABLE_TYPE, tableType); writeAndCheckExpected( @@ -267,26 +265,14 @@ public class ITTestDataStreamWrite extends TestLogger { boolean isMor = conf.getString(FlinkOptions.TABLE_TYPE).equals(HoodieTableType.MERGE_ON_READ.name()); DataStream<RowData> dataStream; - if (isMor) { - TextInputFormat format = new TextInputFormat(new Path(sourcePath)); - format.setFilesFilter(FilePathFilter.createDefaultFilter()); - TypeInformation<String> typeInfo = BasicTypeInfo.STRING_TYPE_INFO; - format.setCharsetName("UTF-8"); - - dataStream = execEnv - // use PROCESS_CONTINUOUSLY mode to trigger checkpoint - .readFile(format, sourcePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000, typeInfo) - .map(JsonDeserializationFunction.getInstance(rowType)) - .setParallelism(1); - } else { - dataStream = execEnv - // use continuous file source to trigger checkpoint - .addSource(new ContinuousFileSource.BoundedSourceFunction(new Path(sourcePath), checkpoints)) - .name("continuous_file_source") - .setParallelism(1) - .map(JsonDeserializationFunction.getInstance(rowType)) - .setParallelism(4); - } + + dataStream = execEnv + // use continuous file source to trigger checkpoint + .addSource(new ContinuousFileSource.BoundedSourceFunction(new Path(sourcePath), checkpoints)) + .name("continuous_file_source") + .setParallelism(1) + .map(JsonDeserializationFunction.getInstance(rowType)) + .setParallelism(4); if (transformer.isPresent()) { dataStream = transformer.get().apply(dataStream); @@ -345,20 +331,8 @@ public class ITTestDataStreamWrite extends TestLogger { } public void execute(StreamExecutionEnvironment execEnv, boolean isMor, String jobName) throws Exception { - if (isMor) { - JobClient client = execEnv.executeAsync(jobName); - if (client.getJobStatus().get() != JobStatus.FAILED) { - try { - TimeUnit.SECONDS.sleep(35); // wait long enough for the compaction to finish - client.cancel(); - } catch (Throwable var1) { - // ignored - } - } - } else { - // wait for the streaming job to finish - execEnv.execute(jobName); - } + // wait for the streaming job to finish + execEnv.execute(jobName); } @Test diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java index 891f3bd1206..997d266fd4e 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java @@ -47,13 +47,14 @@ import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.api.config.ExecutionConfigOptions; import org.apache.flink.table.api.internal.TableEnvironmentImpl; import org.apache.flink.table.catalog.ObjectPath; -import org.apache.flink.table.catalog.exceptions.TableNotExistException; import org.apache.flink.table.data.RowData; import org.apache.flink.types.Row; import org.apache.flink.util.CollectionUtil; +import org.apache.flink.util.ExceptionUtils; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; @@ -83,6 +84,7 @@ import java.util.stream.IntStream; import java.util.stream.Stream; import static org.apache.hudi.utils.TestConfigurations.catalog; +import static org.apache.hudi.utils.TestConfigurations.getCollectSinkDDL; import static org.apache.hudi.utils.TestConfigurations.sql; import static org.apache.hudi.utils.TestData.array; import static org.apache.hudi.utils.TestData.assertRowsEquals; @@ -147,19 +149,19 @@ public class ITTestHoodieDataSource { .option(FlinkOptions.READ_START_COMMIT, firstCommit) .end(); streamTableEnv.executeSql(hoodieTableDDL); - List<Row> rows = execSelectSql(streamTableEnv, "select * from t1", 10); + List<Row> rows = execSelectSqlWithExpectedNum(streamTableEnv, "select * from t1", TestData.DATA_SET_SOURCE_INSERT.size()); assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT); // insert another batch of data execInsertSql(streamTableEnv, insertInto); - List<Row> rows2 = execSelectSql(streamTableEnv, "select * from t1", 10); + List<Row> rows2 = execSelectSqlWithExpectedNum(streamTableEnv, "select * from t1", TestData.DATA_SET_SOURCE_INSERT.size()); assertRowsEquals(rows2, TestData.DATA_SET_SOURCE_INSERT); streamTableEnv.getConfig().getConfiguration() .setBoolean("table.dynamic-table-options.enabled", true); // specify the start commit as earliest - List<Row> rows3 = execSelectSql(streamTableEnv, - "select * from t1/*+options('read.start-commit'='earliest')*/", 10); + List<Row> rows3 = execSelectSqlWithExpectedNum(streamTableEnv, + "select * from t1/*+options('read.start-commit'='earliest')*/", TestData.DATA_SET_SOURCE_INSERT.size()); assertRowsEquals(rows3, TestData.DATA_SET_SOURCE_INSERT); } @@ -184,18 +186,18 @@ public class ITTestHoodieDataSource { execInsertSql(streamTableEnv, insertInto); String firstCommit = TestUtils.getFirstCompleteInstant(tempFile.getAbsolutePath()); - List<Row> rows = execSelectSql(streamTableEnv, - "select * from t1/*+options('read.start-commit'='" + firstCommit + "')*/", 10); + List<Row> rows = execSelectSqlWithExpectedNum(streamTableEnv, + "select * from t1/*+options('read.start-commit'='" + firstCommit + "')*/", TestData.DATA_SET_SOURCE_INSERT.size()); assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT); // insert another batch of data execInsertSql(streamTableEnv, TestSQL.UPDATE_INSERT_T1); - List<Row> rows2 = execSelectSql(streamTableEnv, "select * from t1", 10); + List<Row> rows2 = execSelectSqlWithExpectedNum(streamTableEnv, "select * from t1", TestData.DATA_SET_SOURCE_CHANGELOG.size()); assertRowsEquals(rows2, TestData.DATA_SET_SOURCE_CHANGELOG); // specify the start commit as earliest - List<Row> rows3 = execSelectSql(streamTableEnv, - "select * from t1/*+options('read.start-commit'='earliest')*/", 10); + List<Row> rows3 = execSelectSqlWithExpectedNum(streamTableEnv, + "select * from t1/*+options('read.start-commit'='earliest')*/", TestData.DATA_SET_SOURCE_MERGED.size()); assertRowsEquals(rows3, TestData.DATA_SET_SOURCE_MERGED); } @@ -218,12 +220,12 @@ public class ITTestHoodieDataSource { execInsertSql(streamTableEnv, insertInto); // reading from the latest commit instance. - List<Row> rows = execSelectSql(streamTableEnv, "select * from t1", 10); + List<Row> rows = execSelectSqlWithExpectedNum(streamTableEnv, "select * from t1", TestData.DATA_SET_SOURCE_INSERT_LATEST_COMMIT.size()); assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT_LATEST_COMMIT); // insert another batch of data execInsertSql(streamTableEnv, insertInto); - List<Row> rows2 = execSelectSql(streamTableEnv, "select * from t1", 10); + List<Row> rows2 = execSelectSqlWithExpectedNum(streamTableEnv, "select * from t1", TestData.DATA_SET_SOURCE_INSERT_LATEST_COMMIT.size()); assertRowsEquals(rows2, TestData.DATA_SET_SOURCE_INSERT_LATEST_COMMIT); } @@ -259,7 +261,7 @@ public class ITTestHoodieDataSource { .option(FlinkOptions.READ_START_COMMIT, specifiedCommit) .end(); streamTableEnv.executeSql(createHoodieTable2); - List<Row> rows = execSelectSql(streamTableEnv, "select * from t2", 10); + List<Row> rows = execSelectSqlWithExpectedNum(streamTableEnv, "select * from t2", TestData.DATA_SET_SOURCE_MERGED.size()); // all the data with same keys are appended within one data bucket and one log file, // so when consume, the same keys are merged assertRowsEquals(rows, TestData.DATA_SET_SOURCE_MERGED); @@ -360,7 +362,7 @@ public class ITTestHoodieDataSource { streamTableEnv.getConfig().getConfiguration() .setBoolean("table.dynamic-table-options.enabled", true); final String query = String.format("select * from t1/*+ options('read.start-commit'='%s')*/", instant); - List<Row> rows = execSelectSql(streamTableEnv, query, 10); + List<Row> rows = execSelectSqlWithExpectedNum(streamTableEnv, query, TestData.DATA_SET_SOURCE_INSERT_LATEST_COMMIT.size()); assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT_LATEST_COMMIT); } @@ -388,7 +390,7 @@ public class ITTestHoodieDataSource { streamTableEnv.getConfig().getConfiguration() .setBoolean("table.dynamic-table-options.enabled", true); final String query = String.format("select * from t1/*+ options('read.start-commit'='%s')*/", instant); - List<Row> rows = execSelectSql(streamTableEnv, query, 10); + List<Row> rows = execSelectSqlWithExpectedNum(streamTableEnv, query, TestData.DATA_SET_SOURCE_INSERT_LATEST_COMMIT.size()); assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT_LATEST_COMMIT); } @@ -416,7 +418,7 @@ public class ITTestHoodieDataSource { final String query = String.format("select * from t1/*+ options('read.start-commit'='%s')*/", FlinkOptions.START_COMMIT_EARLIEST); - List<Row> rows = execSelectSql(streamTableEnv, query, 10); + List<Row> rows = execSelectSql(streamTableEnv, query); // batch read will not lose data when cleaned clustered files. assertRowsEquals(rows, CollectionUtils.combine(TestData.DATA_SET_SOURCE_INSERT_FIRST_COMMIT, TestData.DATA_SET_SOURCE_INSERT_LATEST_COMMIT)); @@ -502,10 +504,10 @@ public class ITTestHoodieDataSource { + " name varchar(20),\n" + " age_sum int\n" + ") with (\n" - + " 'connector' = '" + CollectSinkTableFactory.FACTORY_ID + "'" + + " 'connector' = '" + CollectSinkTableFactory.FACTORY_ID + "',\n" + + " 'sink-expected-row-num' = '2'" + ")"; - List<Row> result = execSelectSql(streamTableEnv, - "select name, sum(age) from t1 group by name", sinkDDL, 10); + List<Row> result = execSelectSqlWithExpectedNum(streamTableEnv, "select name, sum(age) from t1 group by name", sinkDDL); final String expected = "[+I(+I[Danny, 24]), +I(+I[Stephen, 34])]"; assertRowsEquals(result, expected, true); } @@ -556,8 +558,9 @@ public class ITTestHoodieDataSource { + "+I[id8, Han, 56, 1970-01-01T00:00:00.008, par4]]", "[+I[id7, Bob, 44, 1970-01-01T00:00:00.007, par4], " + "+I[id8, Han, 56, 1970-01-01T00:00:00.008, par4]]"); + List<Integer> expectedNums = Arrays.asList(8, 3, 2); for (int i = 0; i < sqls.size(); i++) { - List<Row> result = execSelectSql(streamTableEnv, sqls.get(i), 10); + List<Row> result = execSelectSqlWithExpectedNum(streamTableEnv, sqls.get(i), expectedNums.get(i)); assertRowsEquals(result, expectResults.get(i)); } } @@ -583,11 +586,11 @@ public class ITTestHoodieDataSource { .end(); streamTableEnv.executeSql(hoodieTableDDL); - List<Row> result = execSelectSql(streamTableEnv, - "select * from t1 where `partition`='par1'", 10); final String expected = "[" + "+I(+I[id1, Danny, 23, 1970-01-01T00:00:00.001, par1]), " + "+I(+I[id2, Stephen, 33, 1970-01-01T00:00:00.002, par1])]"; + List<Row> result = execSelectSqlWithExpectedNum(streamTableEnv, + "select * from t1 where `partition`='par1'", 2); assertRowsEquals(result, expected, true); } @@ -612,18 +615,8 @@ public class ITTestHoodieDataSource { String insertInto = "insert into t1 select * from source"; execInsertSql(streamTableEnv, insertInto); - - List<Row> result = execSelectSql(streamTableEnv, "select * from t1", 10); - final String expected = "[" - + "+I[id1, Danny, 23, 1970-01-01T00:00:01, par1], " - + "+I[id2, Stephen, 33, 1970-01-01T00:00:02, par1], " - + "+I[id3, Julian, 53, 1970-01-01T00:00:03, par2], " - + "+I[id4, Fabian, 31, 1970-01-01T00:00:04, par2], " - + "+I[id5, Sophia, 18, 1970-01-01T00:00:05, par3], " - + "+I[id6, Emma, 20, 1970-01-01T00:00:06, par3], " - + "+I[id7, Bob, 44, 1970-01-01T00:00:07, par4], " - + "+I[id8, Han, 56, 1970-01-01T00:00:08, par4]]"; - assertRowsEquals(result, expected); + List<Row> rows = execSelectSqlWithExpectedNum(streamTableEnv, "select * from t1", TestData.DATA_SET_SOURCE_INSERT.size()); + assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT); } @ParameterizedTest @@ -821,13 +814,12 @@ public class ITTestHoodieDataSource { + "+I[id7, Bob, 44, par4, 1970-01-01T00:00:07], " + "+I[id8, Han, 56, par4, 1970-01-01T00:00:08]]"; - List<Row> result = execSelectSql(streamTableEnv, "select * from t1", execMode); - + List<Row> result = execSelectSqlWithExpectedNum(streamTableEnv, "select * from t1", "t1", 8); assertRowsEquals(result, expected); // insert another batch of data execInsertSql(streamTableEnv, insertInto); - List<Row> result2 = execSelectSql(streamTableEnv, "select * from t1", execMode); + List<Row> result2 = execSelectSqlWithExpectedNum(streamTableEnv, "select * from t1", "t1", 8); assertRowsEquals(result2, expected); } @@ -861,12 +853,12 @@ public class ITTestHoodieDataSource { + "+I[4, Fabian, 2021-12-04T15:16:04.400004], " + "+I[5, Tom, 2721-12-04T15:16:04.500005]]"; - List<Row> result = execSelectSql(streamTableEnv, "select * from t1", execMode); + List<Row> result = execSelectSqlWithExpectedNum(streamTableEnv, "select * from t1", "t1", 5); assertRowsEquals(result, expected); // insert another batch of data execInsertSql(streamTableEnv, insertInto); - List<Row> result2 = execSelectSql(streamTableEnv, "select * from t1", execMode); + List<Row> result2 = execSelectSqlWithExpectedNum(streamTableEnv, "select * from t1", "t1", 5); assertRowsEquals(result2, expected); } @@ -959,7 +951,7 @@ public class ITTestHoodieDataSource { execInsertSql(streamTableEnv, insertInto); // reading from the earliest commit instance. - List<Row> rows = execSelectSql(streamTableEnv, "select * from t1", 20); + List<Row> rows = execSelectSqlWithExpectedNum(streamTableEnv, "select * from t1", TestData.DATA_SET_SOURCE_INSERT.size()); assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT); } @@ -1162,9 +1154,9 @@ public class ITTestHoodieDataSource { .end(); batchTableEnv.executeSql(createHoodieTable); - // no exception expects to be thrown - assertThrows(Exception.class, - () -> execSelectSql(batchTableEnv, "select * from t1", 10), + // ValidationException expects to be thrown + assertThrows(ValidationException.class, + () -> execSelectSql(batchTableEnv, "select * from t1"), "Exception should throw when querying non-exists table in batch mode"); // case2: empty table without data files @@ -1221,7 +1213,10 @@ public class ITTestHoodieDataSource { + "+I[109, 9000, spare tire, 22.200000762939453], " + "+I[110, 14000, jacket, 0.5]]"; - List<Row> result = execSelectSql(streamTableEnv, "select * from hoodie_sink", execMode); + List<Row> result = + execMode == ExecMode.STREAM + ? execSelectSqlWithExpectedNum(streamTableEnv, "select * from hoodie_sink", "hoodie_sink", 10) + : execSelectSql(streamTableEnv, "select * from hoodie_sink"); assertRowsEquals(result, expected); } @@ -1554,7 +1549,6 @@ public class ITTestHoodieDataSource { assertRowsEquals(result2.subList(result2.size() - 2, result2.size()), "[-U[1], +U[2]]"); } - @Disabled("HUDI-9196") @ParameterizedTest @EnumSource(value = HoodieTableType.class) void testIncrementalReadArchivedCommits(HoodieTableType tableType) throws Exception { @@ -2189,7 +2183,7 @@ public class ITTestHoodieDataSource { .end(); batchTableEnv.executeSql(readHoodieTableDDL); - List<Row> result = execSelectSql(batchTableEnv, "select * from t1", ExecMode.BATCH); + List<Row> result = execSelectSql(batchTableEnv, "select * from t1"); assertRowsEquals(result, expected1.toString()); batchTableEnv.executeSql("drop table t1"); @@ -2211,7 +2205,7 @@ public class ITTestHoodieDataSource { .end(); batchTableEnv.executeSql(readHoodieTableDDL); - result = execSelectSql(batchTableEnv, "select * from t1", ExecMode.BATCH); + result = execSelectSql(batchTableEnv, "select * from t1"); assertRowsEquals(result, expected2.toString()); batchTableEnv.executeSql("drop table t1"); @@ -2233,12 +2227,11 @@ public class ITTestHoodieDataSource { .end(); batchTableEnv.executeSql(readHoodieTableDDL); - result = execSelectSql(batchTableEnv, "select * from t1", ExecMode.BATCH); + result = execSelectSql(batchTableEnv, "select * from t1"); assertRowsEquals(result, expected3.toString()); } - @Disabled("HUDI-9196") @ParameterizedTest @MethodSource("tableTypeAndBooleanTrueFalseParams") void testDynamicPartitionPrune(HoodieTableType tableType, boolean hiveStylePartitioning) throws Exception { @@ -2262,11 +2255,11 @@ public class ITTestHoodieDataSource { // launch a streaming query TableResult tableResult = submitSelectSql(streamTableEnv, "select uuid, name, age, ts, `partition` as part from t1 where `partition` > 'par4'", - TestConfigurations.getCollectSinkDDL("sink")); + TestConfigurations.getCollectSinkDDLWithExpectedNum("sink", TestData.DATA_SET_INSERT_SEPARATE_PARTITION.size())); // write second commit TestData.writeData(TestData.DATA_SET_INSERT_SEPARATE_PARTITION, conf); // stop the streaming query and get data - List<Row> actualResult = fetchResult(streamTableEnv, tableResult, 10); + List<Row> actualResult = fetchResultWithExpectedNum(streamTableEnv, tableResult); assertRowsEquals(actualResult, TestData.DATA_SET_INSERT_SEPARATE_PARTITION); } @@ -2326,8 +2319,7 @@ public class ITTestHoodieDataSource { @ParameterizedTest @MethodSource("parametersForMetaColumnsSkip") - void testWriteWithoutMetaColumns(HoodieTableType tableType, WriteOperationType operation) - throws TableNotExistException, InterruptedException { + void testWriteWithoutMetaColumns(HoodieTableType tableType, WriteOperationType operation) throws Exception { String createSource = TestConfigurations.getFileSourceDDL("source"); streamTableEnv.executeSql(createSource); @@ -2346,7 +2338,7 @@ public class ITTestHoodieDataSource { .option(FlinkOptions.TABLE_TYPE, tableType) .end(); streamTableEnv.executeSql(hoodieTableDDL); - List<Row> rows = execSelectSql(streamTableEnv, "select * from t1", 10); + List<Row> rows = execSelectSqlWithExpectedNum(streamTableEnv, "select * from t1", TestData.DATA_SET_SOURCE_INSERT.size()); assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT); } @@ -2402,7 +2394,7 @@ public class ITTestHoodieDataSource { execInsertSql(streamTableEnv, insertInto); // reading from the earliest - List<Row> rows = execSelectSql(streamTableEnv, "select * from t1", 10); + List<Row> rows = execSelectSqlWithExpectedNum(streamTableEnv, "select * from t1", TestData.DATA_SET_SOURCE_INSERT.size()); assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT); } @@ -2527,44 +2519,44 @@ public class ITTestHoodieDataSource { } } - private List<Row> execSelectSql(TableEnvironment tEnv, String select, ExecMode execMode) - throws TableNotExistException, InterruptedException { - final String[] splits = select.split(" "); - final String tableName = splits[splits.length - 1]; - switch (execMode) { - case STREAM: - return execSelectSql(tEnv, select, 10, tableName); - case BATCH: - return CollectionUtil.iterableToList( - () -> tEnv.sqlQuery("select * from " + tableName).execute().collect()); - default: - throw new AssertionError(); - } + /** + * Use TableResult#collect() to collect results directly for bounded source. + */ + private List<Row> execSelectSql(TableEnvironment tEnv, String select) { + return CollectionUtil.iterableToList( + () -> tEnv.sqlQuery(select).execute().collect()); } - private List<Row> execSelectSql(TableEnvironment tEnv, String select, long timeout) - throws InterruptedException, TableNotExistException { - return execSelectSql(tEnv, select, timeout, null); + /** + * Use CollectTableSink to collect results with expected row number. + */ + private List<Row> execSelectSqlWithExpectedNum(TableEnvironment tEnv, String select, int expectedNum) throws Exception { + return execSelectSqlWithExpectedNum(tEnv, select, null, expectedNum); } - private List<Row> execSelectSql(TableEnvironment tEnv, String select, long timeout, String sourceTable) - throws InterruptedException, TableNotExistException { + /** + * Use CollectTableSink to collect results with expected row number. + */ + private List<Row> execSelectSqlWithExpectedNum(TableEnvironment tEnv, String select, String sourceTable, int expectedNum) + throws Exception { final String sinkDDL; if (sourceTable != null) { - // use the source table schema as the sink schema if the source table was specified, . + // use the source table schema as the sink schema if the source table was specified. ObjectPath objectPath = new ObjectPath(tEnv.getCurrentDatabase(), sourceTable); TableSchema schema = tEnv.getCatalog(tEnv.getCurrentCatalog()).get().getTable(objectPath).getSchema(); - sinkDDL = TestConfigurations.getCollectSinkDDL("sink", schema); + sinkDDL = TestConfigurations.getCollectSinkDDLWithExpectedNum("sink", schema, expectedNum); } else { - sinkDDL = TestConfigurations.getCollectSinkDDL("sink"); + sinkDDL = TestConfigurations.getCollectSinkDDLWithExpectedNum("sink", expectedNum); } - return execSelectSql(tEnv, select, sinkDDL, timeout); + return execSelectSqlWithExpectedNum(tEnv, select, sinkDDL); } - private List<Row> execSelectSql(TableEnvironment tEnv, String select, String sinkDDL, long timeout) - throws InterruptedException { + /** + * Use CollectTableSink to collect results with expected row number. + */ + private List<Row> execSelectSqlWithExpectedNum(TableEnvironment tEnv, String select, String sinkDDL) { TableResult tableResult = submitSelectSql(tEnv, select, sinkDDL); - return fetchResult(tEnv, tableResult, timeout); + return fetchResultWithExpectedNum(tEnv, tableResult); } private TableResult submitSelectSql(TableEnvironment tEnv, String select, String sinkDDL) { @@ -2574,14 +2566,28 @@ public class ITTestHoodieDataSource { return tableResult; } - private List<Row> fetchResult(TableEnvironment tEnv, TableResult tableResult, long timeout) - throws InterruptedException { - // wait for the timeout then cancels the job + private List<Row> execSelectSql(TableEnvironment tEnv, String select, long timeout) throws InterruptedException { + TableResult tableResult = submitSelectSql(tEnv, select, getCollectSinkDDL("sink")); TimeUnit.SECONDS.sleep(timeout); + // wait for the timeout then cancels the job tableResult.getJobClient().ifPresent(JobClient::cancel); tEnv.executeSql("DROP TABLE IF EXISTS sink"); return CollectSinkTableFactory.RESULT.values().stream() .flatMap(Collection::stream) .collect(Collectors.toList()); } + + private List<Row> fetchResultWithExpectedNum(TableEnvironment tEnv, TableResult tableResult) { + try { + // wait the continuous streaming query to be terminated by forced exception with expected row number + // and max waiting timeout is 30s + tableResult.await(30, TimeUnit.SECONDS); + } catch (Throwable e) { + ExceptionUtils.assertThrowable(e, CollectSinkTableFactory.SuccessException.class); + } + tEnv.executeSql("DROP TABLE IF EXISTS sink"); + return CollectSinkTableFactory.RESULT.values().stream() + .flatMap(Collection::stream) + .collect(Collectors.toList()); + } } diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolution.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolution.java index c0dced5a613..5026af91a96 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolution.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolution.java @@ -36,7 +36,6 @@ import org.apache.hudi.utils.FlinkMiniCluster; import org.apache.avro.Schema; import org.apache.avro.SchemaBuilder; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.CoreOptions; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; @@ -81,9 +80,6 @@ public class ITTestSchemaEvolution { public void setUp() { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1); tEnv = StreamTableEnvironment.create(env); - // flink job uses child-first classloader by default, async services fired by flink job are not guaranteed - // to be killed right away, which then may trigger classloader leak checking exception - tEnv.getConfig().getConfiguration().set(CoreOptions.CHECK_LEAKED_CLASSLOADER, false); } @Test diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/FlinkMiniCluster.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/FlinkMiniCluster.java index bfddb43578c..da4442890e5 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/FlinkMiniCluster.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/FlinkMiniCluster.java @@ -18,6 +18,8 @@ package org.apache.hudi.utils; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; import org.apache.flink.runtime.client.JobStatusMessage; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.test.util.AbstractTestBase; @@ -41,6 +43,7 @@ public class FlinkMiniCluster implements BeforeAllCallback, AfterAllCallback, Af private static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = new MiniClusterWithClientResource( new MiniClusterResourceConfiguration.Builder() + .setConfiguration(getDefaultConfig()) .setNumberTaskManagers(1) .setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM) .build()); @@ -60,6 +63,14 @@ public class FlinkMiniCluster implements BeforeAllCallback, AfterAllCallback, Af cleanupRunningJobs(); } + private static Configuration getDefaultConfig() { + Configuration config = new Configuration(); + // flink job uses child-first classloader by default, async services fired by flink job are not + // guaranteed to be killed right away, which then may trigger classloader leak checking exception. + config.set(CoreOptions.CHECK_LEAKED_CLASSLOADER, false); + return config; + } + private void cleanupRunningJobs() throws Exception { if (!MINI_CLUSTER_RESOURCE.getMiniCluster().isRunning()) { // do nothing if the MiniCluster is not running diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java index 92d44a8e7ef..ed5aef764ab 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java @@ -244,6 +244,11 @@ public class TestConfigurations { } public static String getCollectSinkDDL(String tableName) { + // set expectedRowNum as -1 to disable forced exception to terminate a successful sink + return getCollectSinkDDLWithExpectedNum(tableName, -1); + } + + public static String getCollectSinkDDLWithExpectedNum(String tableName, int expectedRowNum) { return "create table " + tableName + "(\n" + " uuid varchar(20),\n" + " name varchar(10),\n" @@ -251,11 +256,16 @@ public class TestConfigurations { + " ts timestamp(3),\n" + " `partition` varchar(20)\n" + ") with (\n" - + " 'connector' = '" + CollectSinkTableFactory.FACTORY_ID + "'" + + " 'connector' = '" + CollectSinkTableFactory.FACTORY_ID + "',\n" + + " 'sink-expected-row-num' = '" + expectedRowNum + "'" + ")"; } public static String getCollectSinkDDL(String tableName, TableSchema tableSchema) { + return getCollectSinkDDLWithExpectedNum(tableName, tableSchema, -1); + } + + public static String getCollectSinkDDLWithExpectedNum(String tableName, TableSchema tableSchema, int expectRowNum) { final StringBuilder builder = new StringBuilder("create table " + tableName + "(\n"); String[] fieldNames = tableSchema.getFieldNames(); DataType[] fieldTypes = tableSchema.getFieldDataTypes(); @@ -271,7 +281,8 @@ public class TestConfigurations { } final String withProps = "" + ") with (\n" - + " 'connector' = '" + CollectSinkTableFactory.FACTORY_ID + "'\n" + + " 'connector' = '" + CollectSinkTableFactory.FACTORY_ID + "',\n" + + " 'sink-expected-row-num' = '" + expectRowNum + "'" + ")"; builder.append(withProps); return builder.toString(); diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/factory/CollectSinkTableFactory.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/factory/CollectSinkTableFactory.java index da0761a7542..ebce7c4ddbd 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/factory/CollectSinkTableFactory.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/factory/CollectSinkTableFactory.java @@ -18,12 +18,14 @@ package org.apache.hudi.utils.factory; +import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.util.ChangelogModes; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; @@ -43,6 +45,7 @@ import org.apache.flink.types.Row; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -59,14 +62,19 @@ public class CollectSinkTableFactory implements DynamicTableSinkFactory { // global results to collect and query public static final Map<Integer, List<Row>> RESULT = new HashMap<>(); + // options + private static final ConfigOption<Integer> SINK_EXPECTED_ROW_NUM = + ConfigOptions.key("sink-expected-row-num").intType().defaultValue(-1); + @Override public DynamicTableSink createDynamicTableSink(Context context) { FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); helper.validate(); TableSchema schema = context.getCatalogTable().getSchema(); + int expectRowNum = helper.getOptions().get(SINK_EXPECTED_ROW_NUM); RESULT.clear(); - return new CollectTableSink(schema, context.getObjectIdentifier().getObjectName()); + return new CollectTableSink(schema, context.getObjectIdentifier().getObjectName(), expectRowNum); } @Override @@ -81,7 +89,7 @@ public class CollectSinkTableFactory implements DynamicTableSinkFactory { @Override public Set<ConfigOption<?>> optionalOptions() { - return Collections.emptySet(); + return new HashSet<>(Collections.singletonList(SINK_EXPECTED_ROW_NUM)); } // -------------------------------------------------------------------------------------------- @@ -95,12 +103,15 @@ public class CollectSinkTableFactory implements DynamicTableSinkFactory { private final TableSchema schema; private final String tableName; + private final int expectedRowNum; private CollectTableSink( TableSchema schema, - String tableName) { + String tableName, + int expectedRowNum) { this.schema = schema; this.tableName = tableName; + this.expectedRowNum = expectedRowNum; } @Override @@ -113,12 +124,16 @@ public class CollectSinkTableFactory implements DynamicTableSinkFactory { final DataType rowType = schema.toPhysicalRowDataType(); final RowTypeInfo rowTypeInfo = (RowTypeInfo) TypeConversions.fromDataTypeToLegacyInfo(rowType); DataStructureConverter converter = context.createDataStructureConverter(schema.toPhysicalRowDataType()); - return SinkFunctionProvider.of(new CollectSinkFunction(converter, rowTypeInfo)); + if (expectedRowNum != -1) { + return SinkFunctionProvider.of(new CollectSinkFunctionWithExpectedNum(converter, rowTypeInfo, expectedRowNum)); + } else { + return SinkFunctionProvider.of(new CollectSinkFunction(converter, rowTypeInfo)); + } } @Override public DynamicTableSink copy() { - return new CollectTableSink(schema, tableName); + return new CollectTableSink(schema, tableName, expectedRowNum); } @Override @@ -170,7 +185,42 @@ public class CollectSinkTableFactory implements DynamicTableSinkFactory { @Override public void snapshotState(FunctionSnapshotContext context) throws Exception { resultState.clear(); - resultState.addAll(RESULT.get(taskID)); + List<Row> rows = RESULT.get(taskID); + if (rows != null) { + resultState.addAll(RESULT.get(taskID)); + } + } + } + + static class CollectSinkFunctionWithExpectedNum extends CollectSinkFunction { + private final int expectRowNum; + + protected CollectSinkFunctionWithExpectedNum( + DynamicTableSink.DataStructureConverter converter, + RowTypeInfo rowTypeInfo, + int expectRowNum) { + super(converter, rowTypeInfo); + ValidationUtils.checkArgument(expectRowNum > 0, "Expected row number should be positive."); + this.expectRowNum = expectRowNum; + } + + @Override + public void invoke(RowData value, Context context) { + super.invoke(value, context); + if (RESULT.values().stream().mapToInt(List::size).sum() >= expectRowNum) { + throw new SuccessException(); + } + } + } + + /** + * Exception that is thrown to terminate a program and indicate success. + */ + public static class SuccessException extends RuntimeException { + private static final long serialVersionUID = 1L; + + public SuccessException() { + super("Forced exception to terminate a successful sink."); } } } diff --git a/hudi-io/src/main/java/org/apache/hudi/storage/HoodieStorage.java b/hudi-io/src/main/java/org/apache/hudi/storage/HoodieStorage.java index d11bd7fc6e9..adcbf6a630f 100644 --- a/hudi-io/src/main/java/org/apache/hudi/storage/HoodieStorage.java +++ b/hudi-io/src/main/java/org/apache/hudi/storage/HoodieStorage.java @@ -389,7 +389,10 @@ public abstract class HoodieStorage implements Closeable { */ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING) public final boolean needCreateTempFile() { - return StorageSchemes.HDFS.getScheme().equals(getScheme()); + return StorageSchemes.HDFS.getScheme().equals(getScheme()) + // Local file will be visible immediately after LocalFileSystem#create(..), even before the output + // stream is closed, so temporary file is also needed for atomic file creating with content written. + || StorageSchemes.FILE.getScheme().equals(getScheme()); } /** diff --git a/packaging/bundle-validation/flink/insert.sql b/packaging/bundle-validation/flink/insert.sql index 624aab5d357..b53b3f55263 100644 --- a/packaging/bundle-validation/flink/insert.sql +++ b/packaging/bundle-validation/flink/insert.sql @@ -17,6 +17,8 @@ * under the License. */ +SET 'table.dml-sync' = 'true'; + CREATE TABLE t1 ( uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED, diff --git a/packaging/bundle-validation/validate.sh b/packaging/bundle-validation/validate.sh index ebaa590e9d1..3dfa53b67f5 100755 --- a/packaging/bundle-validation/validate.sh +++ b/packaging/bundle-validation/validate.sh @@ -182,7 +182,7 @@ test_flink_bundle() { change_java_runtime_version $FLINK_HOME/bin/start-cluster.sh $FLINK_HOME/bin/sql-client.sh -j $JARS_DIR/flink.jar -f $WORKDIR/flink/insert.sql - sleep 10 # for test stability + echo "validate flink insert finished." $WORKDIR/flink/compact.sh $JARS_DIR/flink.jar local EXIT_CODE=$? $FLINK_HOME/bin/stop-cluster.sh