EMsnap commented on code in PR #9836: URL: https://github.com/apache/inlong/pull/9836#discussion_r1529610943
########## inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/redis/src/test/java/org/apache/inlong/sort/redis/RedisTableTest.java: ########## @@ -0,0 +1,622 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.redis; + +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.types.Row; +import org.apache.flink.util.NetUtils; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import redis.clients.jedis.Jedis; +import redis.embedded.RedisServer; + +import java.util.Arrays; +import java.util.concurrent.TimeUnit; + +import static org.awaitility.Awaitility.await; +import static org.junit.Assert.*; + +public class RedisTableTest { + + private static int redisPort; + + private static RedisServer redisServer; + + @BeforeClass + public static void setup() { + redisPort = NetUtils.getAvailablePort().getPort(); + // redisPort = 6379; + redisServer = new RedisServer(redisPort); + redisServer.start(); + } + + @AfterClass + public static void cleanup() { + if (redisServer != null) { + redisServer.stop(); + } + } + + @Before + public void prepare() { + Jedis jedis = new Jedis("localhost", redisPort); + // Deletes all keys from all databases. + jedis.flushAll(); + } + @Test + public void testSourceWithGet() { + StreamExecutionEnvironment executionEnv = + StreamExecutionEnvironment.getExecutionEnvironment(); + StreamTableEnvironment tableEnv = + StreamTableEnvironment.create(executionEnv); + Jedis jedis = new Jedis("127.0.0.1", redisPort); + + jedis.set("1", "value_1"); + jedis.set("2", "value_2"); + + String dim = "CREATE TABLE dim (" + + " aaa varchar, bbb varchar" + + // " PRIMARY KEY (`key`) NOT ENFORCED" + + ") WITH (" + + " 'connector' = 'redis-inlong'," + + " 'command' = 'get'," + + " 'host' = 'localhost'," + + " 'port' = '" + redisPort + "'," + + " 'maxIdle' = '8'," + + " 'minIdle' = '1'," + + " 'maxTotal' = '2'," + + " 'timeout' = '2000'" + + ")"; + String source = + "create table source(aaa varchar, proctime as procTime()) " + + "with ('connector'='datagen', 'rows-per-second'='1', " + + "'fields.aaa.kind'='sequence', 'fields.aaa.start'='1', 'fields.aaa.end'='2'" + + ")"; + String sink = "CREATE TABLE sink (" + + " aaa STRING," + + " bbb STRING," + + " PRIMARY KEY (`aaa`) NOT ENFORCED" + + ") WITH (" + + " 'connector' = 'redis-inlong'," + + " 'sink.batch-size' = '1'," + + " 'format' = 'csv'," + + " 'data-type' = 'PLAIN'," + + " 'redis-mode' = 'standalone'," + + " 'host' = '127.0.0.1'," + + " 'port' = '" + redisPort + "'," + + " 'maxIdle' = '8'," + + " 'minIdle' = '1'," + + " 'maxTotal' = '2'," + + " 'timeout' = '2000'" + + ")"; + + tableEnv.executeSql(dim); + tableEnv.executeSql(source); + tableEnv.executeSql(sink); + String sql = + " insert into sink" + + " select concat_ws('_', s.aaa, s.aaa),concat_ws('_', d.bbb, s.aaa) from source s" + + " left join dim for system_time as of s.proctime as d " + + " on d.aaa = s.aaa"; + + tableEnv.executeSql(sql); + + await().atMost(20, TimeUnit.SECONDS).untilAsserted(() -> { + assertEquals("value_1_1", jedis.get("1_1")); + assertEquals("value_2_2", jedis.get("2_2")); + }); + // + } + + /** + * hget only support get data from the given one key + */ + @Test + public void testSourceWithHget() { + StreamExecutionEnvironment executionEnv = + StreamExecutionEnvironment.getExecutionEnvironment(); + StreamTableEnvironment tableEnv = + StreamTableEnvironment.create(executionEnv); + Jedis jedis = new Jedis("127.0.0.1", redisPort); + + jedis.hset("1", "1", "value_1"); + jedis.hset("1", "2", "value_2"); + + String dim = "CREATE TABLE dim (" + + " aaa varchar, bbb varchar" + + ") WITH (" + + " 'connector' = 'redis-inlong'," + + " 'command' = 'hget'," + + " 'host' = 'localhost'," + + " 'port' = '" + redisPort + "'," + + " 'maxIdle' = '8'," + + " 'minIdle' = '1'," + + " 'maxTotal' = '2'," + + " 'timeout' = '2000'," + + " 'additional.key' = '1'" + + ")"; + String source = + "create table source(aaa varchar,bbb varchar, proctime as procTime()) " + + "with ('connector'='datagen', 'rows-per-second'='1', " + + "'fields.aaa.kind'='sequence', 'fields.aaa.start'='1', 'fields.aaa.end'='2'" + + ")"; + String sink = "CREATE TABLE sink (" + + " aaa STRING," + + " bbb STRING," + + " ccc varchar," + + " PRIMARY KEY (`aaa`) NOT ENFORCED" + + ") WITH (" + + " 'connector' = 'redis-inlong'," + + " 'sink.batch-size' = '1'," + + " 'format' = 'csv'," + + " 'data-type' = 'HASH'," + + " 'redis-mode' = 'standalone'," + + " 'host' = '127.0.0.1'," + + " 'port' = '" + redisPort + "'," + + " 'maxIdle' = '8'," + + " 'minIdle' = '1'," + + " 'maxTotal' = '2'," + + " 'timeout' = '2000'" + + ")"; + + tableEnv.executeSql(dim); + tableEnv.executeSql(source); + tableEnv.executeSql(sink); + String sql = + " insert into sink" + + " select '1_1',d.aaa, d.bbb from source s" + + " left join dim for system_time as of s.proctime as d " + + " on d.aaa = s.aaa and d.bbb = s.bbb"; + String testSql = " select concat_ws('_', s.aaa, s.aaa),d.bbb from source s" Review Comment: testsql is not test please add a test case for it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org