This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 6a00e0cea8 [INLONG-9034][Sort] Fix sort redis test with incorrect use 
of sleep (#9327)
6a00e0cea8 is described below

commit 6a00e0cea8853f0faa6e99cfaccd1a648c51cea9
Author: Sting <zpen...@connect.ust.hk>
AuthorDate: Fri Nov 24 10:29:12 2023 +0800

    [INLONG-9034][Sort] Fix sort redis test with incorrect use of sleep (#9327)
---
 .../sort-flink-v1.13/sort-connectors/redis/pom.xml |  4 ++
 .../apache/inlong/sort/redis/RedisTableTest.java   | 83 +++++++++++-----------
 2 files changed, 44 insertions(+), 43 deletions(-)

diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/redis/pom.xml 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/redis/pom.xml
index 81af253242..fa6c4e7c2a 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/redis/pom.xml
+++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/redis/pom.xml
@@ -120,6 +120,10 @@
             <artifactId>flink-clients_2.11</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.awaitility</groupId>
+            <artifactId>awaitility</artifactId>
+        </dependency>
     </dependencies>
 
     <build>
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/redis/src/test/java/org/apache/inlong/sort/redis/RedisTableTest.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/redis/src/test/java/org/apache/inlong/sort/redis/RedisTableTest.java
index 4f393c5652..ec869e3eef 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/redis/src/test/java/org/apache/inlong/sort/redis/RedisTableTest.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/redis/src/test/java/org/apache/inlong/sort/redis/RedisTableTest.java
@@ -30,7 +30,9 @@ import redis.clients.jedis.Jedis;
 import redis.embedded.RedisServer;
 
 import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
 
+import static org.awaitility.Awaitility.await;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
@@ -64,7 +66,7 @@ public class RedisTableTest {
     }
 
     @Test
-    public void testSinkWithPlain() throws Exception {
+    public void testSinkWithPlain() {
         StreamExecutionEnvironment executionEnv =
                 StreamExecutionEnvironment.getExecutionEnvironment();
         StreamTableEnvironment tableEnv =
@@ -72,8 +74,6 @@ public class RedisTableTest {
 
         executionEnv.setParallelism(1);
 
-        String address = "localhost:" + redisPort;
-
         DataStream<Row> source =
                 executionEnv.fromCollection(
                         Arrays.asList(
@@ -109,15 +109,16 @@ public class RedisTableTest {
         String query = "INSERT INTO sink SELECT * FROM source";
         tableEnv.executeSql(query);
 
-        Thread.sleep(4000);
+        await().atMost(20, TimeUnit.SECONDS).untilAsserted(() -> {
+            assertEquals("r12,1.2,1", jedis.get("1"));
+            assertEquals("r22,2.2,2", jedis.get("2"));
+            assertEquals("r32,3.2,3", jedis.get("3"));
+        });
 
-        assertEquals("r12,1.2,1", jedis.get("1"));
-        assertEquals("r22,2.2,2", jedis.get("2"));
-        assertEquals("r32,3.2,3", jedis.get("3"));
     }
 
     @Test
-    public void testSinkWithHashPrefixMatch() throws Exception {
+    public void testSinkWithHashPrefixMatch() {
         StreamExecutionEnvironment executionEnv =
                 StreamExecutionEnvironment.getExecutionEnvironment();
         StreamTableEnvironment tableEnv =
@@ -125,8 +126,6 @@ public class RedisTableTest {
 
         executionEnv.setParallelism(1);
 
-        String address = "localhost:" + redisPort;
-
         DataStream<Row> source =
                 executionEnv.fromCollection(
                         Arrays.asList(
@@ -162,15 +161,16 @@ public class RedisTableTest {
         String query = "INSERT INTO sink SELECT * FROM source";
         tableEnv.executeSql(query);
 
-        Thread.sleep(4000);
+        await().atMost(20, TimeUnit.SECONDS).untilAsserted(() -> {
+            assertEquals("1.2,1", jedis.hget("1", "r12"));
+            assertEquals("2.2,2", jedis.hget("2", "r22"));
+            assertEquals("3.2,3", jedis.hget("3", "r32"));
+        });
 
-        assertEquals("1.2,1", jedis.hget("1", "r12"));
-        assertEquals("2.2,2", jedis.hget("2", "r22"));
-        assertEquals("3.2,3", jedis.hget("3", "r32"));
     }
 
     @Test
-    public void testSinkWithHashKvPair() throws Exception {
+    public void testSinkWithHashKvPair() {
         StreamExecutionEnvironment executionEnv =
                 StreamExecutionEnvironment.getExecutionEnvironment();
         StreamTableEnvironment tableEnv =
@@ -178,8 +178,6 @@ public class RedisTableTest {
 
         executionEnv.setParallelism(1);
 
-        String address = "localhost:" + redisPort;
-
         DataStream<Row> source =
                 executionEnv.fromCollection(
                         Arrays.asList(
@@ -217,18 +215,19 @@ public class RedisTableTest {
         String query = "INSERT INTO sink SELECT aaa,bbb,cast(ccc as 
STRING),ddd, cast(eee as STRING) FROM source";
         tableEnv.executeSql(query);
 
-        Thread.sleep(4000);
+        await().atMost(20, TimeUnit.SECONDS).untilAsserted(() -> {
+            assertEquals("1.2", jedis.hget("1", "r12"));
+            assertEquals("2.2", jedis.hget("2", "r22"));
+            assertEquals("3.2", jedis.hget("3", "r32"));
+            assertEquals("1", jedis.hget("1", "r14"));
+            assertEquals("2", jedis.hget("2", "r24"));
+            assertEquals("3", jedis.hget("3", "r34"));
+        });
 
-        assertEquals("1.2", jedis.hget("1", "r12"));
-        assertEquals("2.2", jedis.hget("2", "r22"));
-        assertEquals("3.2", jedis.hget("3", "r32"));
-        assertEquals("1", jedis.hget("1", "r14"));
-        assertEquals("2", jedis.hget("2", "r24"));
-        assertEquals("3", jedis.hget("3", "r34"));
     }
 
     @Test
-    public void testSinkWithDynamic() throws Exception {
+    public void testSinkWithDynamic() {
         StreamExecutionEnvironment executionEnv =
                 StreamExecutionEnvironment.getExecutionEnvironment();
         StreamTableEnvironment tableEnv =
@@ -236,8 +235,6 @@ public class RedisTableTest {
 
         executionEnv.setParallelism(1);
 
-        String address = "localhost:" + redisPort;
-
         DataStream<Row> source =
                 executionEnv.fromCollection(
                         Arrays.asList(
@@ -276,14 +273,15 @@ public class RedisTableTest {
                 + "FROM source";
         tableEnv.executeSql(query);
 
-        Thread.sleep(4000);
+        await().atMost(20, TimeUnit.SECONDS).untilAsserted(() -> {
+            assertEquals("1.2", jedis.hget("1", "r12"));
+            assertEquals("2.2", jedis.hget("2", "r22"));
+            assertEquals("3.2", jedis.hget("3", "r32"));
+            assertEquals("1", jedis.hget("1", "r14"));
+            assertEquals("2", jedis.hget("2", "r24"));
+            assertEquals("3", jedis.hget("3", "r34"));
+        });
 
-        assertEquals("1.2", jedis.hget("1", "r12"));
-        assertEquals("2.2", jedis.hget("2", "r22"));
-        assertEquals("3.2", jedis.hget("3", "r32"));
-        assertEquals("1", jedis.hget("1", "r14"));
-        assertEquals("2", jedis.hget("2", "r24"));
-        assertEquals("3", jedis.hget("3", "r34"));
     }
 
     @Test
@@ -295,8 +293,6 @@ public class RedisTableTest {
 
         executionEnv.setParallelism(1);
 
-        String address = "localhost:" + redisPort;
-
         DataStream<Row> source =
                 executionEnv.fromCollection(
                         Arrays.asList(
@@ -343,14 +339,15 @@ public class RedisTableTest {
 
         Thread.sleep(4000);
 
-        assertTrue(jedis.getbit("1", 2));
-        assertTrue(jedis.getbit("1", 4));
-
-        assertFalse(jedis.getbit("2", 2));
-        assertFalse(jedis.getbit("2", 4));
+        await().atMost(20, TimeUnit.SECONDS).untilAsserted(() -> {
+            assertTrue(jedis.getbit("1", 2));
+            assertTrue(jedis.getbit("1", 4));
+            assertFalse(jedis.getbit("2", 2));
+            assertFalse(jedis.getbit("2", 4));
+            assertFalse(jedis.getbit("3", 2));
+            assertFalse(jedis.getbit("3", 4));
+        });
 
-        assertFalse(jedis.getbit("3", 2));
-        assertFalse(jedis.getbit("3", 4));
     }
 
 }

Reply via email to