This is an automated email from the ASF dual-hosted git repository. xushiyan pushed a commit to branch release-0.11.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 23b0dfc337737e2d1e5b792f999c311df62f9498 Author: Sivabalan Narayanan <n.siv...@gmail.com> AuthorDate: Mon Apr 18 08:08:33 2022 -0400 Fixing async clustering job test in TestHoodieDeltaStreamer (#5317) --- .../functional/TestHoodieDeltaStreamer.java | 25 ++++++++++++++-------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java index 2db72cbd41..804676f0ff 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java @@ -104,6 +104,7 @@ import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Tag; @@ -123,6 +124,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -380,7 +382,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { ret = false; } } - return true; + return ret; }); res.get(timeoutInSecs, TimeUnit.SECONDS); } @@ -1028,17 +1030,20 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { }); } - @Disabled("HUDI-3710 to fix the ConcurrentModificationException") @ParameterizedTest @ValueSource(booleans = {true, false}) public void testHoodieAsyncClusteringJob(boolean shouldPassInClusteringInstantTime) throws Exception { String tableBasePath = dfsBasePath + "/asyncClusteringJob"; - - HoodieDeltaStreamer ds = initialHoodieDeltaStreamer(tableBasePath, 3000, "true"); + HoodieDeltaStreamer ds = initialHoodieDeltaStreamer(tableBasePath, 3000, "false"); + CountDownLatch countDownLatch = new CountDownLatch(1); deltaStreamerTestRunner(ds, (r) -> { TestHelpers.assertAtLeastNCommits(2, tableBasePath, dfs); + countDownLatch.countDown(); + return true; + }); + if (countDownLatch.await(2, TimeUnit.MINUTES)) { Option<String> scheduleClusteringInstantTime = Option.empty(); try { HoodieClusteringJob scheduleClusteringJob = @@ -1046,7 +1051,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { scheduleClusteringInstantTime = scheduleClusteringJob.doSchedule(); } catch (Exception e) { LOG.warn("Schedule clustering failed", e); - return false; + Assertions.fail("Schedule clustering failed", e); } if (scheduleClusteringInstantTime.isPresent()) { LOG.info("Schedule clustering success, now cluster with instant time " + scheduleClusteringInstantTime.get()); @@ -1054,13 +1059,15 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { shouldPassInClusteringInstantTime ? scheduleClusteringInstantTime.get() : null, false); HoodieClusteringJob clusterClusteringJob = new HoodieClusteringJob(jsc, clusterClusteringConfig); clusterClusteringJob.cluster(clusterClusteringConfig.retry); + TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, dfs); LOG.info("Cluster success"); } else { - LOG.warn("Schedule clustering failed"); + LOG.warn("Clustering execution failed"); + Assertions.fail("Clustering execution failed"); } - TestHelpers.assertAtLeastNReplaceCommits(2, tableBasePath, dfs); - return true; - }); + } else { + Assertions.fail("Deltastreamer should have completed 2 commits."); + } } @Test