emptyOVO commented on code in PR #10801:
URL: https://github.com/apache/inlong/pull/10801#discussion_r1725479053


##########
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/RedisSource.java:
##########
@@ -84,6 +181,278 @@ public boolean sourceFinish() {
 
     @Override
     public boolean sourceExist() {
-        return false;
+        return true;
+    }
+
+    private String getRedisUri() {
+        StringBuffer sb = new StringBuffer("redis://");
+        sb.append(hostName).append(":").append(port);
+        sb.append("?");
+        if (!StringUtils.isEmpty(authPassword)) {
+            sb.append("authPassword=").append(authPassword).append("&");
+        }
+        if (!StringUtils.isEmpty(authUser)) {
+            sb.append("authUser=").append(authUser).append("&");
+        }
+        if (!StringUtils.isEmpty(readTimeout)) {
+            sb.append("readTimeout=").append(readTimeout).append("&");
+        }
+        if (ssl) {
+            sb.append("ssl=").append("yes").append("&");
+        }
+        if (!StringUtils.isEmpty(snapShot)) {
+            sb.append("replOffset=").append(snapShot).append("&");
+        }
+        if (!StringUtils.isEmpty(replId)) {
+            sb.append("replId=").append(replId).append("&");
+        }
+        if (sb.charAt(sb.length() - 1) == '?' || sb.charAt(sb.length() - 1) == 
'&') {
+            sb.deleteCharAt(sb.length() - 1);
+        }
+        return sb.toString();
+    }
+
+    private void initReplicator() {
+        DefaultCommandParser defaultCommandParser = new DefaultCommandParser();
+        redisReplicator.addCommandParser(CommandName.name("APPEND"), 
defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("SET"), 
defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("SETEX"), 
defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("MSET"), 
defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("DEL"), 
defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("SADD"), 
defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("HMSET"), 
defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("HSET"), 
defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("LSET"), 
defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("EXPIRE"), 
defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("EXPIREAT"), 
defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("GETSET"), 
defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("HSETNX"), 
defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("MSETNX"), 
defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("PSETEX"), 
defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("SETNX"), 
defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("SETRANGE"), 
defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("HDEL"), 
defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("LPOP"), 
defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("LPUSH"), 
defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("LPUSHX"), 
defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("LRem"), 
defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("RPOP"), 
defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("RPUSH"), 
defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("RPUSHX"), 
defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("ZREM"), 
defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("RENAME"), 
defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("INCR"), 
defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("DECR"), 
defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("INCRBY"), 
defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("DECRBY"), 
defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("PERSIST"), 
defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("SELECT"), 
defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("FLUSHALL"), 
defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("FLUSHDB"), 
defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("HINCRBY"), 
defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("ZINCRBY"), 
defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("MOVE"), 
defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("SMOVE"), 
defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("PFADD"), 
defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("PFCOUNT"), 
defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("PFMERGE"), 
defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("SDIFFSTORE"), 
defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("SINTERSTORE"), 
defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("SUNIONSTORE"), 
defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("ZADD"), 
defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("ZINTERSTORE"), 
defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("ZUNIONSTORE"), 
defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("BRPOPLPUSH"), 
defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("LINSERT"), 
defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("RENAMENX"), 
defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("RESTORE"), 
defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("PEXPIRE"), 
defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("PEXPIREAT"), 
defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("GEOADD"), 
defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("EVAL"), 
defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("EVALSHA"), 
defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("SCRIPT"), 
defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("PUBLISH"), 
defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("BITOP"), 
defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("BITFIELD"), 
defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("SETBIT"), 
defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("SREM"), 
defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("UNLINK"), 
defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("SWAPDB"), 
defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("MULTI"), 
defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("EXEC"), 
defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("ZREMRANGEBYSCORE"), 
defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("ZREMRANGEBYRANK"), 
defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("ZREMRANGEBYLEX"), 
defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("LTRIM"), 
defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("SORT"), 
defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("RPOPLPUSH"), 
defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("ZPOPMIN"), 
defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("ZPOPMAX"), 
defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("REPLCONF"), 
defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("XACK"), 
defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("XADD"), 
defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("XCLAIM"), 
defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("XDEL"), 
defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("XGROUP"), 
defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("XTRIM"), 
defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("XSETID"), 
defaultCommandParser);
+        // since redis 6.2
+        redisReplicator.addCommandParser(CommandName.name("COPY"), 
defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("LMOVE"), 
defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("BLMOVE"), 
defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("ZDIFFSTORE"), 
defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("GEOSEARCHSTORE"), 
defaultCommandParser);
+        // since redis 7.0
+        redisReplicator.addCommandParser(CommandName.name("SPUBLISH"), 
defaultCommandParser);
+        redisReplicator.addCommandParser(CommandName.name("FUNCTION"), 
defaultCommandParser);
+        // add EventListener
+        redisReplicator.addEventListener((replicator, event) -> {

Review Comment:
   > I hope we can also specify Redis commands to collect the data we want, 
rather than just synchronizing all Redis data.
   
   Is it a good idea to use Jedis on top of the redis replicator to operate and 
filter out the data we need ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to