emptyOVO commented on code in PR #10801: URL: https://github.com/apache/inlong/pull/10801#discussion_r1725479053
########## inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/RedisSource.java: ########## @@ -84,6 +181,278 @@ public boolean sourceFinish() { @Override public boolean sourceExist() { - return false; + return true; + } + + private String getRedisUri() { + StringBuffer sb = new StringBuffer("redis://"); + sb.append(hostName).append(":").append(port); + sb.append("?"); + if (!StringUtils.isEmpty(authPassword)) { + sb.append("authPassword=").append(authPassword).append("&"); + } + if (!StringUtils.isEmpty(authUser)) { + sb.append("authUser=").append(authUser).append("&"); + } + if (!StringUtils.isEmpty(readTimeout)) { + sb.append("readTimeout=").append(readTimeout).append("&"); + } + if (ssl) { + sb.append("ssl=").append("yes").append("&"); + } + if (!StringUtils.isEmpty(snapShot)) { + sb.append("replOffset=").append(snapShot).append("&"); + } + if (!StringUtils.isEmpty(replId)) { + sb.append("replId=").append(replId).append("&"); + } + if (sb.charAt(sb.length() - 1) == '?' || sb.charAt(sb.length() - 1) == '&') { + sb.deleteCharAt(sb.length() - 1); + } + return sb.toString(); + } + + private void initReplicator() { + DefaultCommandParser defaultCommandParser = new DefaultCommandParser(); + redisReplicator.addCommandParser(CommandName.name("APPEND"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("SET"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("SETEX"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("MSET"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("DEL"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("SADD"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("HMSET"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("HSET"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("LSET"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("EXPIRE"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("EXPIREAT"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("GETSET"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("HSETNX"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("MSETNX"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("PSETEX"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("SETNX"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("SETRANGE"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("HDEL"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("LPOP"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("LPUSH"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("LPUSHX"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("LRem"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("RPOP"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("RPUSH"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("RPUSHX"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("ZREM"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("RENAME"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("INCR"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("DECR"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("INCRBY"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("DECRBY"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("PERSIST"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("SELECT"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("FLUSHALL"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("FLUSHDB"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("HINCRBY"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("ZINCRBY"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("MOVE"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("SMOVE"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("PFADD"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("PFCOUNT"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("PFMERGE"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("SDIFFSTORE"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("SINTERSTORE"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("SUNIONSTORE"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("ZADD"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("ZINTERSTORE"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("ZUNIONSTORE"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("BRPOPLPUSH"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("LINSERT"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("RENAMENX"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("RESTORE"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("PEXPIRE"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("PEXPIREAT"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("GEOADD"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("EVAL"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("EVALSHA"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("SCRIPT"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("PUBLISH"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("BITOP"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("BITFIELD"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("SETBIT"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("SREM"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("UNLINK"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("SWAPDB"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("MULTI"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("EXEC"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("ZREMRANGEBYSCORE"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("ZREMRANGEBYRANK"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("ZREMRANGEBYLEX"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("LTRIM"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("SORT"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("RPOPLPUSH"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("ZPOPMIN"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("ZPOPMAX"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("REPLCONF"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("XACK"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("XADD"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("XCLAIM"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("XDEL"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("XGROUP"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("XTRIM"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("XSETID"), defaultCommandParser); + // since redis 6.2 + redisReplicator.addCommandParser(CommandName.name("COPY"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("LMOVE"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("BLMOVE"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("ZDIFFSTORE"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("GEOSEARCHSTORE"), defaultCommandParser); + // since redis 7.0 + redisReplicator.addCommandParser(CommandName.name("SPUBLISH"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("FUNCTION"), defaultCommandParser); + // add EventListener + redisReplicator.addEventListener((replicator, event) -> { Review Comment: > I hope we can also specify Redis commands to collect the data we want, rather than just synchronizing all Redis data. Is it a good idea to use Jedis on top of the redis replicator to operate and filter out the data we need ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org