This is an automated email from the ASF dual-hosted git repository. zirui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 6a00e0cea8 [INLONG-9034][Sort] Fix sort redis test with incorrect use of sleep (#9327) 6a00e0cea8 is described below commit 6a00e0cea8853f0faa6e99cfaccd1a648c51cea9 Author: Sting <zpen...@connect.ust.hk> AuthorDate: Fri Nov 24 10:29:12 2023 +0800 [INLONG-9034][Sort] Fix sort redis test with incorrect use of sleep (#9327) --- .../sort-flink-v1.13/sort-connectors/redis/pom.xml | 4 ++ .../apache/inlong/sort/redis/RedisTableTest.java | 83 +++++++++++----------- 2 files changed, 44 insertions(+), 43 deletions(-) diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/redis/pom.xml b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/redis/pom.xml index 81af253242..fa6c4e7c2a 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/redis/pom.xml +++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/redis/pom.xml @@ -120,6 +120,10 @@ <artifactId>flink-clients_2.11</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.awaitility</groupId> + <artifactId>awaitility</artifactId> + </dependency> </dependencies> <build> diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/redis/src/test/java/org/apache/inlong/sort/redis/RedisTableTest.java b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/redis/src/test/java/org/apache/inlong/sort/redis/RedisTableTest.java index 4f393c5652..ec869e3eef 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/redis/src/test/java/org/apache/inlong/sort/redis/RedisTableTest.java +++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/redis/src/test/java/org/apache/inlong/sort/redis/RedisTableTest.java @@ -30,7 +30,9 @@ import redis.clients.jedis.Jedis; import redis.embedded.RedisServer; import java.util.Arrays; +import java.util.concurrent.TimeUnit; +import static org.awaitility.Awaitility.await; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; @@ -64,7 +66,7 @@ public class RedisTableTest { } @Test - public void testSinkWithPlain() throws Exception { + public void testSinkWithPlain() { StreamExecutionEnvironment executionEnv = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = @@ -72,8 +74,6 @@ public class RedisTableTest { executionEnv.setParallelism(1); - String address = "localhost:" + redisPort; - DataStream<Row> source = executionEnv.fromCollection( Arrays.asList( @@ -109,15 +109,16 @@ public class RedisTableTest { String query = "INSERT INTO sink SELECT * FROM source"; tableEnv.executeSql(query); - Thread.sleep(4000); + await().atMost(20, TimeUnit.SECONDS).untilAsserted(() -> { + assertEquals("r12,1.2,1", jedis.get("1")); + assertEquals("r22,2.2,2", jedis.get("2")); + assertEquals("r32,3.2,3", jedis.get("3")); + }); - assertEquals("r12,1.2,1", jedis.get("1")); - assertEquals("r22,2.2,2", jedis.get("2")); - assertEquals("r32,3.2,3", jedis.get("3")); } @Test - public void testSinkWithHashPrefixMatch() throws Exception { + public void testSinkWithHashPrefixMatch() { StreamExecutionEnvironment executionEnv = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = @@ -125,8 +126,6 @@ public class RedisTableTest { executionEnv.setParallelism(1); - String address = "localhost:" + redisPort; - DataStream<Row> source = executionEnv.fromCollection( Arrays.asList( @@ -162,15 +161,16 @@ public class RedisTableTest { String query = "INSERT INTO sink SELECT * FROM source"; tableEnv.executeSql(query); - Thread.sleep(4000); + await().atMost(20, TimeUnit.SECONDS).untilAsserted(() -> { + assertEquals("1.2,1", jedis.hget("1", "r12")); + assertEquals("2.2,2", jedis.hget("2", "r22")); + assertEquals("3.2,3", jedis.hget("3", "r32")); + }); - assertEquals("1.2,1", jedis.hget("1", "r12")); - assertEquals("2.2,2", jedis.hget("2", "r22")); - assertEquals("3.2,3", jedis.hget("3", "r32")); } @Test - public void testSinkWithHashKvPair() throws Exception { + public void testSinkWithHashKvPair() { StreamExecutionEnvironment executionEnv = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = @@ -178,8 +178,6 @@ public class RedisTableTest { executionEnv.setParallelism(1); - String address = "localhost:" + redisPort; - DataStream<Row> source = executionEnv.fromCollection( Arrays.asList( @@ -217,18 +215,19 @@ public class RedisTableTest { String query = "INSERT INTO sink SELECT aaa,bbb,cast(ccc as STRING),ddd, cast(eee as STRING) FROM source"; tableEnv.executeSql(query); - Thread.sleep(4000); + await().atMost(20, TimeUnit.SECONDS).untilAsserted(() -> { + assertEquals("1.2", jedis.hget("1", "r12")); + assertEquals("2.2", jedis.hget("2", "r22")); + assertEquals("3.2", jedis.hget("3", "r32")); + assertEquals("1", jedis.hget("1", "r14")); + assertEquals("2", jedis.hget("2", "r24")); + assertEquals("3", jedis.hget("3", "r34")); + }); - assertEquals("1.2", jedis.hget("1", "r12")); - assertEquals("2.2", jedis.hget("2", "r22")); - assertEquals("3.2", jedis.hget("3", "r32")); - assertEquals("1", jedis.hget("1", "r14")); - assertEquals("2", jedis.hget("2", "r24")); - assertEquals("3", jedis.hget("3", "r34")); } @Test - public void testSinkWithDynamic() throws Exception { + public void testSinkWithDynamic() { StreamExecutionEnvironment executionEnv = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = @@ -236,8 +235,6 @@ public class RedisTableTest { executionEnv.setParallelism(1); - String address = "localhost:" + redisPort; - DataStream<Row> source = executionEnv.fromCollection( Arrays.asList( @@ -276,14 +273,15 @@ public class RedisTableTest { + "FROM source"; tableEnv.executeSql(query); - Thread.sleep(4000); + await().atMost(20, TimeUnit.SECONDS).untilAsserted(() -> { + assertEquals("1.2", jedis.hget("1", "r12")); + assertEquals("2.2", jedis.hget("2", "r22")); + assertEquals("3.2", jedis.hget("3", "r32")); + assertEquals("1", jedis.hget("1", "r14")); + assertEquals("2", jedis.hget("2", "r24")); + assertEquals("3", jedis.hget("3", "r34")); + }); - assertEquals("1.2", jedis.hget("1", "r12")); - assertEquals("2.2", jedis.hget("2", "r22")); - assertEquals("3.2", jedis.hget("3", "r32")); - assertEquals("1", jedis.hget("1", "r14")); - assertEquals("2", jedis.hget("2", "r24")); - assertEquals("3", jedis.hget("3", "r34")); } @Test @@ -295,8 +293,6 @@ public class RedisTableTest { executionEnv.setParallelism(1); - String address = "localhost:" + redisPort; - DataStream<Row> source = executionEnv.fromCollection( Arrays.asList( @@ -343,14 +339,15 @@ public class RedisTableTest { Thread.sleep(4000); - assertTrue(jedis.getbit("1", 2)); - assertTrue(jedis.getbit("1", 4)); - - assertFalse(jedis.getbit("2", 2)); - assertFalse(jedis.getbit("2", 4)); + await().atMost(20, TimeUnit.SECONDS).untilAsserted(() -> { + assertTrue(jedis.getbit("1", 2)); + assertTrue(jedis.getbit("1", 4)); + assertFalse(jedis.getbit("2", 2)); + assertFalse(jedis.getbit("2", 4)); + assertFalse(jedis.getbit("3", 2)); + assertFalse(jedis.getbit("3", 4)); + }); - assertFalse(jedis.getbit("3", 2)); - assertFalse(jedis.getbit("3", 4)); } }