This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch release-0.11.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 23b0dfc337737e2d1e5b792f999c311df62f9498
Author: Sivabalan Narayanan <n.siv...@gmail.com>
AuthorDate: Mon Apr 18 08:08:33 2022 -0400

    Fixing async clustering job test in TestHoodieDeltaStreamer (#5317)
---
 .../functional/TestHoodieDeltaStreamer.java        | 25 ++++++++++++++--------
 1 file changed, 16 insertions(+), 9 deletions(-)

diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
index 2db72cbd41..804676f0ff 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
@@ -104,6 +104,7 @@ import org.apache.spark.sql.types.DataTypes;
 import org.apache.spark.sql.types.StructField;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Tag;
@@ -123,6 +124,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
@@ -380,7 +382,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
             ret = false;
           }
         }
-        return true;
+        return ret;
       });
       res.get(timeoutInSecs, TimeUnit.SECONDS);
     }
@@ -1028,17 +1030,20 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     });
   }
 
-  @Disabled("HUDI-3710 to fix the ConcurrentModificationException")
   @ParameterizedTest
   @ValueSource(booleans = {true, false})
   public void testHoodieAsyncClusteringJob(boolean 
shouldPassInClusteringInstantTime) throws Exception {
     String tableBasePath = dfsBasePath + "/asyncClusteringJob";
-
-    HoodieDeltaStreamer ds = initialHoodieDeltaStreamer(tableBasePath, 3000, 
"true");
+    HoodieDeltaStreamer ds = initialHoodieDeltaStreamer(tableBasePath, 3000, 
"false");
+    CountDownLatch countDownLatch = new CountDownLatch(1);
 
     deltaStreamerTestRunner(ds, (r) -> {
       TestHelpers.assertAtLeastNCommits(2, tableBasePath, dfs);
+      countDownLatch.countDown();
+      return true;
+    });
 
+    if (countDownLatch.await(2, TimeUnit.MINUTES)) {
       Option<String> scheduleClusteringInstantTime = Option.empty();
       try {
         HoodieClusteringJob scheduleClusteringJob =
@@ -1046,7 +1051,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
         scheduleClusteringInstantTime = scheduleClusteringJob.doSchedule();
       } catch (Exception e) {
         LOG.warn("Schedule clustering failed", e);
-        return false;
+        Assertions.fail("Schedule clustering failed", e);
       }
       if (scheduleClusteringInstantTime.isPresent()) {
         LOG.info("Schedule clustering success, now cluster with instant time " 
+ scheduleClusteringInstantTime.get());
@@ -1054,13 +1059,15 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
             shouldPassInClusteringInstantTime ? 
scheduleClusteringInstantTime.get() : null, false);
         HoodieClusteringJob clusterClusteringJob = new 
HoodieClusteringJob(jsc, clusterClusteringConfig);
         clusterClusteringJob.cluster(clusterClusteringConfig.retry);
+        TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, dfs);
         LOG.info("Cluster success");
       } else {
-        LOG.warn("Schedule clustering failed");
+        LOG.warn("Clustering execution failed");
+        Assertions.fail("Clustering execution failed");
       }
-      TestHelpers.assertAtLeastNReplaceCommits(2, tableBasePath, dfs);
-      return true;
-    });
+    } else {
+      Assertions.fail("Deltastreamer should have completed 2 commits.");
+    }
   }
 
   @Test

Reply via email to