emptyOVO commented on code in PR #10801: URL: https://github.com/apache/inlong/pull/10801#discussion_r1721832043
########## inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/RedisSource.java: ########## @@ -84,6 +181,278 @@ public boolean sourceFinish() { @Override public boolean sourceExist() { - return false; + return true; + } + + private String getRedisUri() { + StringBuffer sb = new StringBuffer("redis://"); + sb.append(hostName).append(":").append(port); + sb.append("?"); + if (!StringUtils.isEmpty(authPassword)) { + sb.append("authPassword=").append(authPassword).append("&"); + } + if (!StringUtils.isEmpty(authUser)) { + sb.append("authUser=").append(authUser).append("&"); + } + if (!StringUtils.isEmpty(readTimeout)) { + sb.append("readTimeout=").append(readTimeout).append("&"); + } + if (ssl) { + sb.append("ssl=").append("yes").append("&"); + } + if (!StringUtils.isEmpty(snapShot)) { + sb.append("replOffset=").append(snapShot).append("&"); + } + if (!StringUtils.isEmpty(replId)) { + sb.append("replId=").append(replId).append("&"); + } + if (sb.charAt(sb.length() - 1) == '?' || sb.charAt(sb.length() - 1) == '&') { + sb.deleteCharAt(sb.length() - 1); + } + return sb.toString(); + } + + private void initReplicator() { + DefaultCommandParser defaultCommandParser = new DefaultCommandParser(); + redisReplicator.addCommandParser(CommandName.name("APPEND"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("SET"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("SETEX"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("MSET"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("DEL"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("SADD"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("HMSET"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("HSET"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("LSET"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("EXPIRE"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("EXPIREAT"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("GETSET"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("HSETNX"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("MSETNX"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("PSETEX"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("SETNX"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("SETRANGE"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("HDEL"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("LPOP"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("LPUSH"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("LPUSHX"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("LRem"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("RPOP"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("RPUSH"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("RPUSHX"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("ZREM"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("RENAME"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("INCR"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("DECR"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("INCRBY"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("DECRBY"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("PERSIST"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("SELECT"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("FLUSHALL"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("FLUSHDB"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("HINCRBY"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("ZINCRBY"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("MOVE"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("SMOVE"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("PFADD"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("PFCOUNT"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("PFMERGE"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("SDIFFSTORE"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("SINTERSTORE"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("SUNIONSTORE"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("ZADD"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("ZINTERSTORE"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("ZUNIONSTORE"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("BRPOPLPUSH"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("LINSERT"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("RENAMENX"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("RESTORE"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("PEXPIRE"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("PEXPIREAT"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("GEOADD"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("EVAL"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("EVALSHA"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("SCRIPT"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("PUBLISH"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("BITOP"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("BITFIELD"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("SETBIT"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("SREM"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("UNLINK"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("SWAPDB"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("MULTI"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("EXEC"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("ZREMRANGEBYSCORE"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("ZREMRANGEBYRANK"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("ZREMRANGEBYLEX"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("LTRIM"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("SORT"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("RPOPLPUSH"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("ZPOPMIN"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("ZPOPMAX"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("REPLCONF"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("XACK"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("XADD"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("XCLAIM"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("XDEL"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("XGROUP"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("XTRIM"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("XSETID"), defaultCommandParser); + // since redis 6.2 + redisReplicator.addCommandParser(CommandName.name("COPY"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("LMOVE"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("BLMOVE"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("ZDIFFSTORE"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("GEOSEARCHSTORE"), defaultCommandParser); + // since redis 7.0 + redisReplicator.addCommandParser(CommandName.name("SPUBLISH"), defaultCommandParser); + redisReplicator.addCommandParser(CommandName.name("FUNCTION"), defaultCommandParser); + // add EventListener + redisReplicator.addEventListener((replicator, event) -> { Review Comment: Sorry, Maybe I didn't make that clear in the description, do you mean that EventListener will only obtain redis data that is added after startup ? when RedisReplicator connect to redis server it will first synchronizes the rdb file, then PostRdbSyncEvent event is triggered which means historical data of Redis is synchronized completedly, then the redisReplicator switches to command incremental synchronization mode and captures every command that occurs in the Redis instance (such as SET, DEL, etc.) and synchronizes it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org