This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 1dbd9d407a [minor] following 4270, add unit tests for the keys lost
case (#5918)
1dbd9d407a is described below
commit 1dbd9d407a354f01380148dbf2754a10bc8d600e
Author: Danny Chan <[email protected]>
AuthorDate: Wed Jun 22 16:56:06 2022 +0800
[minor] following 4270, add unit tests for the keys lost case (#5918)
---
.../table/timeline/TestHoodieActiveTimeline.java | 2 +-
.../org/apache/hudi/sink/TestWriteMergeOnRead.java | 25 ++++++++++++++++++++++
.../test/java/org/apache/hudi/utils/TestData.java | 19 ++++++++++++++++
3 files changed, 45 insertions(+), 1 deletion(-)
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java
index 55806bf1e0..767a16f0c0 100755
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java
@@ -516,7 +516,7 @@ public class TestHoodieActiveTimeline extends
HoodieCommonTestHarness {
}
executorService.shutdown();
- assertTrue(executorService.awaitTermination(10, TimeUnit.SECONDS));
+ assertTrue(executorService.awaitTermination(60, TimeUnit.SECONDS));
// required to catch exceptions
for (Future f : futures) {
f.get();
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
index f2c0500f95..aa31d859bb 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
@@ -38,6 +38,31 @@ public class TestWriteMergeOnRead extends
TestWriteCopyOnWrite {
conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
}
+ @Test
+ public void testIndexStateBootstrapWithMultiFilesInOneSlice() throws
Exception {
+ // open the function and ingest data
+ preparePipeline(conf)
+ .consume(TestData.filterOddRows(TestData.DATA_SET_INSERT))
+ .assertEmptyDataFiles()
+ .checkpoint(1)
+ .assertNextEvent()
+ .checkpointComplete(1)
+ .consume(TestData.filterEvenRows(TestData.DATA_SET_INSERT))
+ .checkpoint(2)
+ .assertNextEvent()
+ .checkpointComplete(2)
+ .checkWrittenData(EXPECTED1, 4)
+ // write another commit but does not complete it
+ .consume(TestData.filterEvenRows(TestData.DATA_SET_INSERT))
+ .checkpoint(3)
+ .assertNextEvent()
+ .end();
+
+ // reset the config option
+ conf.setBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED, true);
+ validateIndexLoaded();
+ }
+
@Test
public void testIndexStateBootstrapWithCompactionScheduled() throws
Exception {
// sets up the delta commits as 1 to generate a new compaction plan.
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
index c31c2bbada..d03e1b2eb5 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
@@ -64,6 +64,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
+import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -304,6 +305,24 @@ public class TestData {
return inserts;
}
+ public static List<RowData> filterOddRows(List<RowData> rows) {
+ return filterRowsByIndexPredicate(rows, i -> i % 2 != 0);
+ }
+
+ public static List<RowData> filterEvenRows(List<RowData> rows) {
+ return filterRowsByIndexPredicate(rows, i -> i % 2 == 0);
+ }
+
+ private static List<RowData> filterRowsByIndexPredicate(List<RowData> rows,
Predicate<Integer> predicate) {
+ List<RowData> filtered = new ArrayList<>();
+ for (int i = 0; i < rows.size(); i++) {
+ if (predicate.test(i)) {
+ filtered.add(rows.get(i));
+ }
+ }
+ return filtered;
+ }
+
private static Integer toIdSafely(Object id) {
if (id == null) {
return -1;