This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new cc82864873 [INLONG-10287][Agent] Update the Redis Source (#11084)
cc82864873 is described below

commit cc828648738e60e7796546a95bb95f94d2e90844
Author: emptyOVO <118812562+empty...@users.noreply.github.com>
AuthorDate: Tue Oct 8 18:58:53 2024 +0800

    [INLONG-10287][Agent] Update the Redis Source (#11084)
---
 .../inlong/agent/constant/TaskConstants.java       |  16 +
 .../org/apache/inlong/agent/pojo/RedisTask.java    |  15 +-
 .../apache/inlong/agent/pojo/TaskProfileDto.java   |  10 +-
 inlong-agent/agent-plugins/pom.xml                 |   4 +
 .../agent/plugin/instance/RedisInstance.java}      |  31 +-
 .../inlong/agent/plugin/sources/RedisSource.java   | 592 ++++++++++++++++++++-
 .../apache/inlong/agent/plugin/task/RedisTask.java |  78 +++
 .../agent/plugin/sources/TestRedisSource.java      | 259 +++++++++
 8 files changed, 967 insertions(+), 38 deletions(-)

diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
index 9398d70640..fbf0b5b705 100755
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
@@ -178,6 +178,22 @@ public class TaskConstants extends CommonConstants {
     public static final String TASK_SQLSERVER_UNIX_TIMESTAMP_FORMAT_ENABLE =
             "task.sqlserverTask.unixTimestampFormatEnable";
 
+    public static final String TASK_REDIS_PORT = "task.redisTask.port";
+    public static final String TASK_REDIS_HOSTNAME = "task.redisTask.hostname";
+    public static final String TASK_REDIS_SSL = "task.redisTask.ssl";
+    public static final String TASK_REDIS_AUTHUSER = "task.redisTask.authUser";
+    public static final String TASK_REDIS_AUTHPASSWORD = 
"task.redisTask.authPassword";
+    public static final String TASK_REDIS_READTIMEOUT = 
"task.redisTask.readTimeout";
+    public static final String TASK_REDIS_REPLID = "task.redisTask.replId";
+    public static final String TASK_REDIS_OFFSET = "task.redisTask.offset";
+    public static final String TASK_REDIS_DB_NUMBER = 
"task.redisTask.dbNumber";
+    public static final String TASK_REDIS_COMMAND = "task.redisTask.command";
+    public static final String TASK_REDIS_KEYS = "task.redisTask.keys";
+    public static final String TASK_REDIS_FIELD_OR_MEMBER = 
"task.redisTask.fieldOrMember";
+    public static final String TASK_REDIS_IS_SUBSCRIBE = 
"task.redisTask.isSubscribe";
+    public static final String TASK_REDIS_SUBOPERATION = 
"task.redisTask.subOperation";
+    public static final String TASK_REDIS_SYNC_FREQ = 
"task.redisTask.syncFreq";
+
     public static final String TASK_STATE = "task.state";
 
     public static final String INSTANCE_STATE = "instance.state";
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/RedisTask.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/RedisTask.java
index 701bea2b8e..2b8a5c4ca6 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/RedisTask.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/RedisTask.java
@@ -18,7 +18,6 @@
 package org.apache.inlong.agent.pojo;
 
 import lombok.Data;
-
 @Data
 public class RedisTask {
 
@@ -30,6 +29,13 @@ public class RedisTask {
     private String readTimeout;
     private String queueSize;
     private String replId;
+    private String dbNumber;
+    private String command;
+    private String keys;
+    private String fieldOrMember;
+    private Boolean isSubscribe;
+    private String syncFreq;
+    private String subOperations;
 
     @Data
     public static class RedisTaskConfig {
@@ -42,5 +48,12 @@ public class RedisTask {
         private String timeout;
         private String queueSize;
         private String replId;
+        private String dbNumber;
+        private String command;
+        private String keys;
+        private String fieldOrMember;
+        private Boolean isSubscribe;
+        private String syncFreq;
+        private String subOperations;
     }
 }
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
index bb602e2c61..1558bc042f 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
@@ -53,6 +53,7 @@ public class TaskProfileDto {
     public static final String DEFAULT_PULSAR_TASK = 
"org.apache.inlong.agent.plugin.task.PulsarTask";
     public static final String DEFAULT_MONGODB_TASK = 
"org.apache.inlong.agent.plugin.task.MongoDBTask";
     public static final String DEFAULT_ORACLE_TASK = 
"org.apache.inlong.agent.plugin.task.OracleTask";
+    public static final String DEFAULT_REDIS_TASK = 
"org.apache.inlong.agent.plugin.task.RedisTask";
     public static final String DEFAULT_POSTGRESQL_TASK = 
"org.apache.inlong.agent.plugin.task.PostgreSQLTask";
     public static final String DEFAULT_MQTT_TASK = 
"org.apache.inlong.agent.plugin.task.MqttTask";
     public static final String DEFAULT_SQLSERVER_TASK = 
"org.apache.inlong.agent.plugin.task.SQLServerTask";
@@ -274,8 +275,14 @@ public class TaskProfileDto {
         redisTask.setPort(config.getPort());
         redisTask.setSsl(config.getSsl());
         redisTask.setReadTimeout(config.getTimeout());
-        redisTask.setQueueSize(config.getQueueSize());
         redisTask.setReplId(config.getReplId());
+        redisTask.setCommand(config.getCommand());
+        redisTask.setDbNumber(config.getDbNumber());
+        redisTask.setKeys(config.getKeys());
+        redisTask.setFieldOrMember(config.getFieldOrMember());
+        redisTask.setIsSubscribe(config.getIsSubscribe());
+        redisTask.setSyncFreq(config.getSyncFreq());
+        redisTask.setSubOperations(config.getSubOperations());
 
         return redisTask;
     }
@@ -521,6 +528,7 @@ public class TaskProfileDto {
                 profileDto.setTask(task);
                 break;
             case REDIS:
+                task.setTaskClass(DEFAULT_REDIS_TASK);
                 RedisTask redisTask = getRedisTask(dataConfig);
                 task.setRedisTask(redisTask);
                 task.setSource(REDIS_SOURCE);
diff --git a/inlong-agent/agent-plugins/pom.xml 
b/inlong-agent/agent-plugins/pom.xml
index eb092bfed0..750141798b 100644
--- a/inlong-agent/agent-plugins/pom.xml
+++ b/inlong-agent/agent-plugins/pom.xml
@@ -42,6 +42,10 @@
             <artifactId>ojdbc8</artifactId>
             <version>${ojdbc.version}</version>
         </dependency>
+        <dependency>
+            <groupId>redis.clients</groupId>
+            <artifactId>jedis</artifactId>
+        </dependency>
         <dependency>
             <groupId>org.apache.inlong</groupId>
             <artifactId>agent-common</artifactId>
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/RedisTask.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/RedisInstance.java
similarity index 56%
copy from 
inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/RedisTask.java
copy to 
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/RedisInstance.java
index 701bea2b8e..f34c3db923 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/RedisTask.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/RedisInstance.java
@@ -15,32 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.agent.pojo;
+package org.apache.inlong.agent.plugin.instance;
 
-import lombok.Data;
+import org.apache.inlong.agent.conf.InstanceProfile;
+import org.apache.inlong.agent.constant.TaskConstants;
 
-@Data
-public class RedisTask {
+public class RedisInstance extends CommonInstance {
 
-    private String authUser;
-    private String authPassword;
-    private String hostname;
-    private String port;
-    private Boolean ssl;
-    private String readTimeout;
-    private String queueSize;
-    private String replId;
-
-    @Data
-    public static class RedisTaskConfig {
-
-        private String username;
-        private String password;
-        private String hostname;
-        private String port;
-        private Boolean ssl;
-        private String timeout;
-        private String queueSize;
-        private String replId;
+    @Override
+    public void setInodeInfo(InstanceProfile profile) {
+        profile.set(TaskConstants.INODE_INFO, "");
     }
 }
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/RedisSource.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/RedisSource.java
index b7fea66e4c..aada1bf504 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/RedisSource.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/RedisSource.java
@@ -17,14 +17,53 @@
 
 package org.apache.inlong.agent.plugin.sources;
 
+import org.apache.inlong.agent.common.AgentThreadFactory;
 import org.apache.inlong.agent.conf.InstanceProfile;
-import org.apache.inlong.agent.plugin.Message;
+import org.apache.inlong.agent.constant.TaskConstants;
+import org.apache.inlong.agent.metrics.audit.AuditUtils;
 import org.apache.inlong.agent.plugin.sources.file.AbstractSource;
 
+import com.google.common.collect.Maps;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.TypeAdapter;
+import com.google.gson.stream.JsonReader;
+import com.google.gson.stream.JsonWriter;
+import com.moilioncircle.redis.replicator.RedisReplicator;
+import com.moilioncircle.redis.replicator.Replicator;
+import com.moilioncircle.redis.replicator.cmd.CommandName;
+import com.moilioncircle.redis.replicator.cmd.impl.DefaultCommand;
+import com.moilioncircle.redis.replicator.cmd.parser.DefaultCommandParser;
+import com.moilioncircle.redis.replicator.event.PostRdbSyncEvent;
+import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueHash;
+import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueList;
+import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueSet;
+import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueString;
+import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueZSet;
+import com.moilioncircle.redis.replicator.rdb.datatype.KeyValuePair;
+import com.moilioncircle.redis.replicator.rdb.datatype.ZSetEntry;
+import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.Pipeline;
+import redis.clients.jedis.exceptions.JedisConnectionException;
 
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Redis source
@@ -32,6 +71,50 @@ import java.util.List;
 public class RedisSource extends AbstractSource {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(RedisSource.class);
+    private static final long MAX_DATA_SIZE = 500 * 1024;
+    private static final int REDIS_QUEUE_SIZE = 10000;
+    private static final long DEFAULT_FREQ = 60 * 1000;
+    private static final String GET_COMMAND = "GET";
+    private static final String MGET_COMMAND = "MGET";
+    private static final String HGET_COMMAND = "HGET";
+    private static final String ZSCORE_COMMAND = "ZSCORE";
+    private static final String ZREVRANK_COMMAND = "ZREVRANK";
+    private static final String EXISTS_COMMAND = "EXISTS";
+    private Gson gson;
+
+    public InstanceProfile profile;
+    private String port;
+    private Jedis jedis;
+    private String hostName;
+    private boolean ssl;
+    private String authUser;
+    private String authPassword;
+    private String readTimeout;
+    private String replId;
+    private String snapShot;
+    private String dbNumber;
+    private String redisCommand;
+
+    private String fieldOrMember;
+    private boolean destroyed;
+    private boolean isSubscribe;
+    private Set<String> keys;
+    private Set<String> subOperations;
+    private Replicator redisReplicator;
+    private BlockingQueue<SourceData> redisQueue;
+    private ScheduledExecutorService executor;
+
+    // Command handler map
+    private static final Map<String, CommandHandler> commandHandlers = 
Maps.newConcurrentMap();
+
+    static {
+        commandHandlers.put(GET_COMMAND, RedisSource::handleGet);
+        commandHandlers.put(MGET_COMMAND, RedisSource::handleMGet);
+        commandHandlers.put(HGET_COMMAND, RedisSource::handleHGet);
+        commandHandlers.put(ZSCORE_COMMAND, RedisSource::handleZScore);
+        commandHandlers.put(ZREVRANK_COMMAND, RedisSource::handleZRevRank);
+        commandHandlers.put(EXISTS_COMMAND, RedisSource::handleExists);
+    }
 
     public RedisSource() {
 
@@ -39,32 +122,207 @@ public class RedisSource extends AbstractSource {
 
     @Override
     protected String getThreadName() {
-        return null;
+        return "redis-source-" + taskId + "-" + instanceId;
     }
 
     @Override
     protected void initSource(InstanceProfile profile) {
+        LOGGER.info("Redis Source init: {}", profile.toJsonStr());
+        this.port = profile.get(TaskConstants.TASK_REDIS_PORT);
+        this.hostName = profile.get(TaskConstants.TASK_REDIS_HOSTNAME);
+        this.ssl = profile.getBoolean(TaskConstants.TASK_REDIS_SSL, false);
+        this.authUser = profile.get(TaskConstants.TASK_REDIS_AUTHUSER, "");
+        this.authPassword = profile.get(TaskConstants.TASK_REDIS_AUTHPASSWORD, 
"");
+        this.readTimeout = profile.get(TaskConstants.TASK_REDIS_READTIMEOUT, 
"");
+        this.replId = profile.get(TaskConstants.TASK_REDIS_REPLID, "");
+        this.snapShot = profile.get(TaskConstants.TASK_REDIS_OFFSET, "-1");
+        this.dbNumber = profile.get(TaskConstants.TASK_REDIS_DB_NUMBER, "0");
+        this.keys = new 
ConcurrentSkipListSet<>(Arrays.asList(profile.get(TaskConstants.TASK_REDIS_KEYS).split(",")));
+        this.isSubscribe = 
profile.getBoolean(TaskConstants.TASK_REDIS_IS_SUBSCRIBE, false);
+        this.instanceId = profile.getInstanceId();
+        this.redisQueue = new LinkedBlockingQueue<>(REDIS_QUEUE_SIZE);
+        initGson();
+        String uri = getRedisUri();
+        try {
+            if (isSubscribe) {
+                // use subscribe mode
+                this.subOperations = new ConcurrentSkipListSet<>(
+                        
Arrays.asList(profile.get(TaskConstants.TASK_REDIS_SUBOPERATION).split(",")));
+                this.executor = (ScheduledExecutorService) 
Executors.newSingleThreadExecutor();
+                this.redisReplicator = new RedisReplicator(uri);
+                initReplicator();
+                this.executor.execute(startReplicatorSync());
+            } else {
+                this.executor = Executors.newScheduledThreadPool(1);
+                // use command mode
+                this.redisCommand = 
profile.get(TaskConstants.TASK_REDIS_COMMAND, GET_COMMAND);
+                this.fieldOrMember = 
profile.get(TaskConstants.TASK_REDIS_FIELD_OR_MEMBER, null);
+                // default frequency 1min
+                long syncFreq = 
profile.getLong(TaskConstants.TASK_REDIS_SYNC_FREQ, DEFAULT_FREQ);
+                this.jedis = new Jedis(uri);
+                jedis.connect();
+                this.executor.scheduleWithFixedDelay(startJedisSync(), 0, 
syncFreq, TimeUnit.MILLISECONDS);
+            }
+        } catch (URISyntaxException | IOException | JedisConnectionException 
e) {
+            sourceMetric.pluginReadFailCount.addAndGet(1);
+            LOGGER.error("Connect to redis {}:{} failed.", hostName, port, e);
+        }
+    }
 
+    private Runnable startReplicatorSync() {
+        return () -> {
+            AgentThreadFactory.nameThread(getThreadName() + "redis subscribe 
mode");
+            executor.execute(new Thread(() -> {
+                try {
+                    this.redisReplicator.open();
+                } catch (IOException e) {
+                    LOGGER.error("Redis source error, fail to start 
replicator", e);
+                }
+            }));
+        };
     }
 
-    @Override
-    protected void printCurrentState() {
+    private Runnable startJedisSync() {
+        return () -> {
+            AgentThreadFactory.nameThread(getThreadName() + "redis command 
mode");
+            executor.execute(new Thread(() -> {
+                Map<String, Object> dataMap =
+                        fetchDataByJedis(jedis, redisCommand, new 
ArrayList<>(keys), fieldOrMember);
+                synchronizeData(gson.toJson(dataMap));
+            }));
+        };
+    }
+
+    private Map<String, Object> fetchDataByJedis(Jedis jedis, String command, 
List<String> keys, String fieldOrMember) {
+        Map<String, Object> result = new HashMap<>();
+        CommandHandler handler = commandHandlers.get(command.toUpperCase());
+        if (handler != null) {
+            handler.handle(jedis, keys, fieldOrMember, result);
+        } else {
+            LOGGER.error("Unsupported command: " + command);
+            throw new UnsupportedOperationException("Unsupported command: " + 
command);
+        }
+        return result;
+    }
 
+    private static void handleGet(Jedis jedis, List<String> keys, String 
fieldOrMember, Map<String, Object> result) {
+        Pipeline pipeline = jedis.pipelined();
+        for (String key : keys) {
+            pipeline.get(key);
+        }
+        List<Object> getValues = pipeline.syncAndReturnAll();
+        for (int i = 0; i < keys.size(); i++) {
+            result.put(keys.get(i), getValues.get(i));
+        }
+    }
+
+    private static void handleMGet(Jedis jedis, List<String> keys, String 
fieldOrMember, Map<String, Object> result) {
+        List<String> mGetValues = jedis.mget(keys.toArray(new String[0]));
+        for (int i = 0; i < keys.size(); i++) {
+            result.put(keys.get(i), mGetValues.get(i));
+        }
+    }
+
+    private static void handleHGet(Jedis jedis, List<String> keys, String 
fieldOrMember, Map<String, Object> result) {
+        for (String key : keys) {
+            String value = jedis.hget(key, fieldOrMember);
+            result.put(key, value);
+        }
+    }
+
+    private static void handleZScore(Jedis jedis, List<String> keys, String 
fieldOrMember, Map<String, Object> result) {
+        for (String key : keys) {
+            if (!StringUtils.isEmpty(fieldOrMember)) {
+                Double score = jedis.zscore(key, fieldOrMember);
+                result.put(key, score);
+            }
+        }
+    }
+
+    private static void handleZRevRank(Jedis jedis, List<String> keys, String 
fieldOrMember,
+            Map<String, Object> result) {
+        for (String key : keys) {
+            if (!StringUtils.isEmpty(fieldOrMember)) {
+                Long rank = jedis.zrevrank(key, fieldOrMember);
+                result.put(key, rank);
+            }
+        }
+    }
+
+    private static void handleExists(Jedis jedis, List<String> keys, String 
fieldOrMember, Map<String, Object> result) {
+        for (String key : keys) {
+            boolean exists = jedis.exists(key);
+            result.put(key, exists);
+        }
+    }
+
+    // Functional interface for handling commands
+    @FunctionalInterface
+    private interface CommandHandler {
+
+        void handle(Jedis jedis, List<String> keys, String fieldOrMember, 
Map<String, Object> result);
+    }
+
+    private void synchronizeData(String data) {
+        try {
+            if (!StringUtils.isEmpty(data)) {
+                byte[] dataBytes = data.getBytes(StandardCharsets.UTF_8);
+                // limit data size
+                if (dataBytes.length <= MAX_DATA_SIZE) {
+                    SourceData sourceData = new SourceData(dataBytes, "0L");
+                    boolean offerSuc = false;
+                    while (isRunnable() && !offerSuc) {
+                        offerSuc = redisQueue.offer(sourceData, 1, 
TimeUnit.SECONDS);
+                    }
+                    AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, 
inlongGroupId, inlongStreamId,
+                            System.currentTimeMillis(), 1, data.length());
+                    sourceMetric.pluginReadCount.incrementAndGet();
+                } else {
+                    sourceMetric.pluginReadFailCount.incrementAndGet();
+                    LOGGER.warn("Read redis data warn, data overload, 
Automatically skip and discard");
+                }
+            }
+        } catch (InterruptedException e) {
+            sourceMetric.pluginReadFailCount.incrementAndGet();
+            LOGGER.error("Read redis data error", e);
+        }
     }
 
     @Override
-    protected boolean doPrepareToRead() {
-        return false;
+    protected void printCurrentState() {
+        if (isSubscribe) {
+            LOGGER.info("redis subscribe synchronization is {} on source {}",
+                    redisReplicator != null && !executor.isShutdown() ? 
"running" : "stop",
+                    hostName + ":" + port);
+        } else {
+            LOGGER.info("redis command synchronization is {} on source {}", 
!executor.isShutdown() ? "running" : "stop",
+                    hostName + ":" + port);
+        }
     }
 
     @Override
-    protected List<SourceData> readFromSource() {
-        return null;
+    protected boolean doPrepareToRead() {
+        return true;
     }
 
     @Override
-    public Message read() {
-        return null;
+    protected List<SourceData> readFromSource() {
+        List<SourceData> dataList = new ArrayList<>();
+        try {
+            int size = 0;
+            while (size < BATCH_READ_LINE_TOTAL_LEN) {
+                SourceData sourceData = redisQueue.poll(1, TimeUnit.SECONDS);
+                if (sourceData != null) {
+                    size += sourceData.getData().length;
+                    dataList.add(sourceData);
+                } else {
+                    break;
+                }
+            }
+        } catch (InterruptedException e) {
+            LOGGER.error("poll {} data from redis queue interrupted.", 
instanceId);
+        }
+        return dataList;
     }
 
     @Override
@@ -74,7 +332,23 @@ public class RedisSource extends AbstractSource {
 
     @Override
     protected void releaseSource() {
-
+        LOGGER.info("releasing redis source");
+        if (!destroyed) {
+            try {
+                executor.shutdown();
+                // subscribe mode then close replicator
+                if (redisReplicator != null) {
+                    redisReplicator.close();
+                }
+                // command mode then close jedis
+                if (jedis.isConnected()) {
+                    jedis.close();
+                }
+            } catch (IOException e) {
+                LOGGER.error("Redis reader close failed.");
+            }
+            destroyed = true;
+        }
     }
 
     @Override
@@ -84,6 +358,300 @@ public class RedisSource extends AbstractSource {
 
     @Override
     public boolean sourceExist() {
-        return false;
+        return true;
+    }
+
+    private String getRedisUri() {
+        StringBuffer sb = new StringBuffer("redis://");
+        sb.append(hostName).append(":").append(port);
+        if (!StringUtils.isEmpty(dbNumber)) {
+            sb.append("/").append(dbNumber);
+        }
+        sb.append("?");
+        if (!StringUtils.isEmpty(authPassword)) {
+            sb.append("authPassword=").append(authPassword).append("&");
+        }
+        if (!StringUtils.isEmpty(authUser)) {
+            sb.append("authUser=").append(authUser).append("&");
+        }
+        if (!StringUtils.isEmpty(readTimeout)) {
+            sb.append("readTimeout=").append(readTimeout).append("&");
+        }
+        if (ssl) {
+            sb.append("ssl=").append("yes").append("&");
+        }
+        if (!StringUtils.isEmpty(snapShot)) {
+            sb.append("replOffset=").append(snapShot).append("&");
+        }
+        if (!StringUtils.isEmpty(replId)) {
+            sb.append("replId=").append(replId).append("&");
+        }
+        if (sb.charAt(sb.length() - 1) == '?' || sb.charAt(sb.length() - 1) == 
'&') {
+            sb.deleteCharAt(sb.length() - 1);
+        }
+        return sb.toString();
+    }
+
+    private void initReplicator() {
+        if (!subOperations.isEmpty()) {
+            DefaultCommandParser replicatorCommandParser = new 
DefaultCommandParser();
+            for (String subOperation : subOperations) {
+                
this.redisReplicator.addCommandParser(CommandName.name(subOperation), 
replicatorCommandParser);
+            }
+            this.redisReplicator.addEventListener((replicator, event) -> {
+                if (event instanceof DefaultCommand) {
+                    DefaultCommand defaultCommand = (DefaultCommand) event;
+                    Object[] args = defaultCommand.getArgs();
+                    if (args[0] instanceof byte[]) {
+                        String key = new String((byte[]) args[0], 
StandardCharsets.UTF_8);
+                        if (keys.contains(key)) {
+                            synchronizeData(gson.toJson(event));
+                        }
+                    }
+                }
+                if (event instanceof PostRdbSyncEvent) {
+                    this.snapShot = 
String.valueOf(replicator.getConfiguration().getReplOffset());
+                    LOGGER.info("after rdb snapShot is: {}", snapShot);
+                }
+            });
+        } else {
+            // if SubOperation is not configured, subscribe all modification
+            initDefaultReplicator();
+        }
+    }
+
+    private void initDefaultReplicator() {
+        DefaultCommandParser defaultCommandParser = new DefaultCommandParser();
+        this.redisReplicator.addCommandParser(CommandName.name("APPEND"), 
defaultCommandParser);
+        this.redisReplicator.addCommandParser(CommandName.name("SET"), 
defaultCommandParser);
+        this.redisReplicator.addCommandParser(CommandName.name("SETEX"), 
defaultCommandParser);
+        this.redisReplicator.addCommandParser(CommandName.name("MSET"), 
defaultCommandParser);
+        this.redisReplicator.addCommandParser(CommandName.name("DEL"), 
defaultCommandParser);
+        this.redisReplicator.addCommandParser(CommandName.name("SADD"), 
defaultCommandParser);
+        this.redisReplicator.addCommandParser(CommandName.name("HMSET"), 
defaultCommandParser);
+        this.redisReplicator.addCommandParser(CommandName.name("HSET"), 
defaultCommandParser);
+        this.redisReplicator.addCommandParser(CommandName.name("LSET"), 
defaultCommandParser);
+        this.redisReplicator.addCommandParser(CommandName.name("EXPIRE"), 
defaultCommandParser);
+        this.redisReplicator.addCommandParser(CommandName.name("EXPIREAT"), 
defaultCommandParser);
+        this.redisReplicator.addCommandParser(CommandName.name("GETSET"), 
defaultCommandParser);
+        this.redisReplicator.addCommandParser(CommandName.name("HSETNX"), 
defaultCommandParser);
+        this.redisReplicator.addCommandParser(CommandName.name("PSETEX"), 
defaultCommandParser);
+        this.redisReplicator.addCommandParser(CommandName.name("SETNX"), 
defaultCommandParser);
+        this.redisReplicator.addCommandParser(CommandName.name("SETRANGE"), 
defaultCommandParser);
+        this.redisReplicator.addCommandParser(CommandName.name("HDEL"), 
defaultCommandParser);
+        this.redisReplicator.addCommandParser(CommandName.name("LPOP"), 
defaultCommandParser);
+        this.redisReplicator.addCommandParser(CommandName.name("LPUSH"), 
defaultCommandParser);
+        this.redisReplicator.addCommandParser(CommandName.name("LPUSHX"), 
defaultCommandParser);
+        this.redisReplicator.addCommandParser(CommandName.name("LRem"), 
defaultCommandParser);
+        this.redisReplicator.addCommandParser(CommandName.name("RPOP"), 
defaultCommandParser);
+        this.redisReplicator.addCommandParser(CommandName.name("RPUSH"), 
defaultCommandParser);
+        this.redisReplicator.addCommandParser(CommandName.name("RPUSHX"), 
defaultCommandParser);
+        this.redisReplicator.addCommandParser(CommandName.name("ZREM"), 
defaultCommandParser);
+        this.redisReplicator.addCommandParser(CommandName.name("RENAME"), 
defaultCommandParser);
+        this.redisReplicator.addCommandParser(CommandName.name("INCR"), 
defaultCommandParser);
+        this.redisReplicator.addCommandParser(CommandName.name("DECR"), 
defaultCommandParser);
+        this.redisReplicator.addCommandParser(CommandName.name("INCRBY"), 
defaultCommandParser);
+        this.redisReplicator.addCommandParser(CommandName.name("DECRBY"), 
defaultCommandParser);
+        this.redisReplicator.addCommandParser(CommandName.name("PERSIST"), 
defaultCommandParser);
+        this.redisReplicator.addCommandParser(CommandName.name("SELECT"), 
defaultCommandParser);
+        this.redisReplicator.addCommandParser(CommandName.name("FLUSHALL"), 
defaultCommandParser);
+        this.redisReplicator.addCommandParser(CommandName.name("FLUSHDB"), 
defaultCommandParser);
+        this.redisReplicator.addCommandParser(CommandName.name("HINCRBY"), 
defaultCommandParser);
+        this.redisReplicator.addCommandParser(CommandName.name("ZINCRBY"), 
defaultCommandParser);
+        this.redisReplicator.addCommandParser(CommandName.name("MOVE"), 
defaultCommandParser);
+        this.redisReplicator.addCommandParser(CommandName.name("SMOVE"), 
defaultCommandParser);
+        this.redisReplicator.addCommandParser(CommandName.name("PFADD"), 
defaultCommandParser);
+        this.redisReplicator.addCommandParser(CommandName.name("PFCOUNT"), 
defaultCommandParser);
+        this.redisReplicator.addCommandParser(CommandName.name("PFMERGE"), 
defaultCommandParser);
+        this.redisReplicator.addCommandParser(CommandName.name("SDIFFSTORE"), 
defaultCommandParser);
+        this.redisReplicator.addCommandParser(CommandName.name("SINTERSTORE"), 
defaultCommandParser);
+        this.redisReplicator.addCommandParser(CommandName.name("SUNIONSTORE"), 
defaultCommandParser);
+        this.redisReplicator.addCommandParser(CommandName.name("ZADD"), 
defaultCommandParser);
+        this.redisReplicator.addCommandParser(CommandName.name("ZINTERSTORE"), 
defaultCommandParser);
+        this.redisReplicator.addCommandParser(CommandName.name("ZUNIONSTORE"), 
defaultCommandParser);
+        this.redisReplicator.addCommandParser(CommandName.name("BRPOPLPUSH"), 
defaultCommandParser);
+        this.redisReplicator.addCommandParser(CommandName.name("LINSERT"), 
defaultCommandParser);
+        this.redisReplicator.addCommandParser(CommandName.name("RENAMENX"), 
defaultCommandParser);
+        this.redisReplicator.addCommandParser(CommandName.name("RESTORE"), 
defaultCommandParser);
+        this.redisReplicator.addCommandParser(CommandName.name("PEXPIRE"), 
defaultCommandParser);
+        this.redisReplicator.addCommandParser(CommandName.name("PEXPIREAT"), 
defaultCommandParser);
+        this.redisReplicator.addCommandParser(CommandName.name("GEOADD"), 
defaultCommandParser);
+        this.redisReplicator.addCommandParser(CommandName.name("EVAL"), 
defaultCommandParser);
+        this.redisReplicator.addCommandParser(CommandName.name("EVALSHA"), 
defaultCommandParser);
+        this.redisReplicator.addCommandParser(CommandName.name("SCRIPT"), 
defaultCommandParser);
+        this.redisReplicator.addCommandParser(CommandName.name("PUBLISH"), 
defaultCommandParser);
+        this.redisReplicator.addCommandParser(CommandName.name("BITOP"), 
defaultCommandParser);
+        this.redisReplicator.addCommandParser(CommandName.name("BITFIELD"), 
defaultCommandParser);
+        this.redisReplicator.addCommandParser(CommandName.name("SETBIT"), 
defaultCommandParser);
+        this.redisReplicator.addCommandParser(CommandName.name("SREM"), 
defaultCommandParser);
+        this.redisReplicator.addCommandParser(CommandName.name("UNLINK"), 
defaultCommandParser);
+        this.redisReplicator.addCommandParser(CommandName.name("SWAPDB"), 
defaultCommandParser);
+        this.redisReplicator.addCommandParser(CommandName.name("MULTI"), 
defaultCommandParser);
+        this.redisReplicator.addCommandParser(CommandName.name("EXEC"), 
defaultCommandParser);
+        
this.redisReplicator.addCommandParser(CommandName.name("ZREMRANGEBYSCORE"), 
defaultCommandParser);
+        
this.redisReplicator.addCommandParser(CommandName.name("ZREMRANGEBYRANK"), 
defaultCommandParser);
+        
this.redisReplicator.addCommandParser(CommandName.name("ZREMRANGEBYLEX"), 
defaultCommandParser);
+        this.redisReplicator.addCommandParser(CommandName.name("LTRIM"), 
defaultCommandParser);
+        this.redisReplicator.addCommandParser(CommandName.name("SORT"), 
defaultCommandParser);
+        this.redisReplicator.addCommandParser(CommandName.name("RPOPLPUSH"), 
defaultCommandParser);
+        this.redisReplicator.addCommandParser(CommandName.name("ZPOPMIN"), 
defaultCommandParser);
+        this.redisReplicator.addCommandParser(CommandName.name("ZPOPMAX"), 
defaultCommandParser);
+        this.redisReplicator.addCommandParser(CommandName.name("REPLCONF"), 
defaultCommandParser);
+        this.redisReplicator.addCommandParser(CommandName.name("XACK"), 
defaultCommandParser);
+        this.redisReplicator.addCommandParser(CommandName.name("XADD"), 
defaultCommandParser);
+        this.redisReplicator.addCommandParser(CommandName.name("XCLAIM"), 
defaultCommandParser);
+        this.redisReplicator.addCommandParser(CommandName.name("XDEL"), 
defaultCommandParser);
+        this.redisReplicator.addCommandParser(CommandName.name("XGROUP"), 
defaultCommandParser);
+        this.redisReplicator.addCommandParser(CommandName.name("XTRIM"), 
defaultCommandParser);
+        this.redisReplicator.addCommandParser(CommandName.name("XSETID"), 
defaultCommandParser);
+        // since redis 6.2
+        this.redisReplicator.addCommandParser(CommandName.name("COPY"), 
defaultCommandParser);
+        this.redisReplicator.addCommandParser(CommandName.name("LMOVE"), 
defaultCommandParser);
+        this.redisReplicator.addCommandParser(CommandName.name("BLMOVE"), 
defaultCommandParser);
+        this.redisReplicator.addCommandParser(CommandName.name("ZDIFFSTORE"), 
defaultCommandParser);
+        
this.redisReplicator.addCommandParser(CommandName.name("GEOSEARCHSTORE"), 
defaultCommandParser);
+        // since redis 7.0
+        this.redisReplicator.addCommandParser(CommandName.name("SPUBLISH"), 
defaultCommandParser);
+        this.redisReplicator.addCommandParser(CommandName.name("FUNCTION"), 
defaultCommandParser);
+        // add EventListener
+        this.redisReplicator.addEventListener((replicator, event) -> {
+            if (event instanceof KeyValuePair<?, ?> || event instanceof 
DefaultCommand) {
+                KeyValuePair<?, ?> kvEvent = (KeyValuePair<?, ?>) event;
+                String key = kvEvent.getKey().toString();
+                if (keys.contains(key)) {
+                    synchronizeData(gson.toJson(event));
+                }
+            }
+            if (event instanceof PostRdbSyncEvent) {
+                this.snapShot = 
String.valueOf(replicator.getConfiguration().getReplOffset());
+                LOGGER.info("after rdb snapShot is: {}", snapShot);
+            }
+        });
+    }
+
+    /**
+     * init GSON parser
+     */
+    private void initGson() {
+        this.gson =
+                new 
GsonBuilder().registerTypeAdapter(KeyStringValueHash.class, new 
TypeAdapter<KeyStringValueHash>() {
+
+                    @Override
+                    public void write(JsonWriter out, KeyStringValueHash kv) 
throws IOException {
+                        out.beginObject();
+                        out.name("DB").beginObject();
+                        out.name("dbNumber").value(kv.getDb().getDbNumber());
+                        out.name("dbSize").value(kv.getDb().getDbsize());
+                        out.name("expires").value(kv.getDb().getExpires());
+                        out.endObject();
+                        out.name("valueRdbType").value(kv.getValueRdbType());
+                        out.name("key").value(new String(kv.getKey()));
+                        out.name("value").beginObject();
+                        for (byte[] b : kv.getValue().keySet()) {
+                            out.name(new String(b)).value(new 
String(kv.getValue().get(b)));
+                        }
+                        out.endObject();
+                        out.endObject();
+                    }
+
+                    @Override
+                    public KeyStringValueHash read(JsonReader in) throws 
IOException {
+                        return null;
+                    }
+                }).registerTypeAdapter(DefaultCommand.class, new 
TypeAdapter<DefaultCommand>() {
+
+                    @Override
+                    public void write(JsonWriter out, DefaultCommand dc) 
throws IOException {
+                        out.beginObject();
+                        out.name("key").value(new String(dc.getCommand()));
+                        out.name("value").beginArray();
+                        for (byte[] bytes : dc.getArgs()) {
+                            out.value(new String(bytes));
+                        }
+                        out.endArray();
+                        out.endObject();
+                    }
+
+                    @Override
+                    public DefaultCommand read(JsonReader in) throws 
IOException {
+                        return null;
+                    }
+                })
+                        .registerTypeAdapter(KeyStringValueList.class, new 
TypeAdapter<KeyStringValueList>() {
+
+                            @Override
+                            public void write(JsonWriter out, 
KeyStringValueList kv) throws IOException {
+                                out.beginObject();
+                                out.name("key").value(new String(kv.getKey()));
+                                out.name("value").beginArray();
+                                for (byte[] bytes : kv.getValue()) {
+                                    out.value(new String(bytes));
+                                }
+                                out.endArray();
+                                out.endObject();
+                            }
+
+                            @Override
+                            public KeyStringValueList read(JsonReader in) 
throws IOException {
+                                return null;
+                            }
+                        })
+                        .registerTypeAdapter(KeyStringValueSet.class, new 
TypeAdapter<KeyStringValueSet>() {
+
+                            @Override
+                            public void write(JsonWriter out, 
KeyStringValueSet kv) throws IOException {
+                                out.beginObject();
+                                out.name("key").value(new String(kv.getKey()));
+                                out.name("value").beginArray();
+                                for (byte[] bytes : kv.getValue()) {
+                                    out.value(new String(bytes));
+                                }
+                                out.endArray();
+                                out.endObject();
+                            }
+
+                            @Override
+                            public KeyStringValueSet read(JsonReader in) 
throws IOException {
+                                return null;
+                            }
+                        })
+                        .registerTypeAdapter(KeyStringValueString.class, new 
TypeAdapter<KeyStringValueString>() {
+
+                            @Override
+                            public void write(JsonWriter out, 
KeyStringValueString kv) throws IOException {
+                                out.beginObject();
+                                out.name("key").value(new String(kv.getKey()));
+                                out.name("value").value(new 
String(kv.getValue()));
+                                out.endObject();
+                            }
+
+                            @Override
+                            public KeyStringValueString read(JsonReader in) 
throws IOException {
+                                return null;
+                            }
+                        })
+                        .registerTypeAdapter(KeyStringValueZSet.class, new 
TypeAdapter<KeyStringValueZSet>() {
+
+                            @Override
+                            public void write(JsonWriter out, 
KeyStringValueZSet kv) throws IOException {
+                                out.beginObject();
+                                out.name("key").value(new String(kv.getKey()));
+                                out.name("value").beginArray();
+                                for (ZSetEntry entry : kv.getValue()) {
+                                    out.beginObject();
+                                    out.name("element").value(new 
String(entry.getElement()));
+                                    out.name("score").value(entry.getScore());
+                                    out.endObject();
+                                }
+                                out.endArray();
+                                out.endObject();
+                            }
+
+                            @Override
+                            public KeyStringValueZSet read(JsonReader in) 
throws IOException {
+                                return null;
+                            }
+                        })
+                        .create();
     }
 }
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/RedisTask.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/RedisTask.java
new file mode 100644
index 0000000000..b9f7449ecd
--- /dev/null
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/RedisTask.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.task;
+
+import org.apache.inlong.agent.conf.InstanceProfile;
+import org.apache.inlong.agent.conf.TaskProfile;
+import org.apache.inlong.agent.constant.CycleUnitType;
+import org.apache.inlong.agent.constant.TaskConstants;
+import org.apache.inlong.agent.utils.AgentUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.List;
+
+public class RedisTask extends AbstractTask {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(RedisTask.class);
+    public static final String DEFAULT_REDIS_INSTANCE = 
"org.apache.inlong.agent.plugin.instance.RedisInstance";
+    private boolean isAdded = false;
+    private String taskId;
+
+    private final DateTimeFormatter dateTimeFormatter = 
DateTimeFormatter.ofPattern("yyyyMMddHH");
+
+    @Override
+    public boolean isProfileValid(TaskProfile profile) {
+        if (!profile.allRequiredKeyExist()) {
+            LOGGER.error("task profile needs all required key");
+            return false;
+        }
+        return true;
+    }
+
+    @Override
+    protected int getInstanceLimit() {
+        return DEFAULT_INSTANCE_LIMIT;
+    }
+
+    @Override
+    protected void initTask() {
+        LOGGER.info("Redis commonInit: {}", taskProfile.toJsonStr());
+        this.taskId = taskProfile.get(TaskConstants.TASK_REDIS_REPLID)
+                + "-" + taskProfile.get(TaskConstants.TASK_REDIS_IS_SUBSCRIBE);
+    }
+
+    @Override
+    protected List<InstanceProfile> getNewInstanceList() {
+        List<InstanceProfile> list = new ArrayList<>();
+        if (isAdded) {
+            return list;
+        }
+        String dataTime = LocalDateTime.now().format(dateTimeFormatter);
+        InstanceProfile instanceProfile = 
taskProfile.createInstanceProfile(DEFAULT_REDIS_INSTANCE, taskId,
+                CycleUnitType.HOUR, dataTime, AgentUtils.getCurrentTime());
+        LOGGER.info("taskProfile.createInstanceProfile: {}", 
instanceProfile.toJsonStr());
+        list.add(instanceProfile);
+        this.isAdded = true;
+        return list;
+    }
+}
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestRedisSource.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestRedisSource.java
new file mode 100644
index 0000000000..061a74c092
--- /dev/null
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestRedisSource.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.sources;
+
+import org.apache.inlong.agent.conf.InstanceProfile;
+import org.apache.inlong.agent.conf.TaskProfile;
+import org.apache.inlong.agent.constant.AgentConstants;
+import org.apache.inlong.agent.constant.CommonConstants;
+import org.apache.inlong.agent.constant.TaskConstants;
+import org.apache.inlong.agent.core.task.OffsetManager;
+import org.apache.inlong.agent.core.task.TaskManager;
+import org.apache.inlong.agent.plugin.AgentBaseTestsHelper;
+import org.apache.inlong.agent.store.Store;
+import org.apache.inlong.agent.utils.AgentUtils;
+import org.apache.inlong.common.enums.TaskStateEnum;
+import org.apache.inlong.common.metric.MetricRegister;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.Pipeline;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.powermock.api.mockito.PowerMockito.mockStatic;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+/**
+ * Test cases for {@link RedisSource}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({Executors.class, RedisSource.class, MetricRegister.class})
+@PowerMockIgnore({"javax.management.*"})
+public class TestRedisSource {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(TestRedisSource.class);
+
+    private static AgentBaseTestsHelper helper;
+
+    private final String instanceId = "s4bc475560b4444dbd4e9812ab1fd64d";
+
+    private static Store taskBasicStore;
+    private static Store instanceBasicStore;
+    private static Store offsetBasicStore;
+
+    @Mock
+    private InstanceProfile profile;
+
+    @Mock
+    private Jedis jedis;
+
+    @Mock
+    private Pipeline pipeline;
+
+    @Mock
+    private ScheduledExecutorService executor;
+
+    @InjectMocks
+    private RedisSource redisSource;
+
+    @Before
+    public void setUp() {
+        helper = new 
AgentBaseTestsHelper(UUID.randomUUID().toString()).setupAgentHome();
+        taskBasicStore = 
TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_TASK);
+        instanceBasicStore = 
TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_INSTANCE);
+        offsetBasicStore =
+                TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_OFFSET);
+        OffsetManager.init(taskBasicStore, instanceBasicStore, 
offsetBasicStore);
+        mockStatic(Executors.class);
+        when(Executors.newSingleThreadExecutor()).thenReturn(executor);
+        when(Executors.newScheduledThreadPool(1)).thenReturn(executor);
+        initProfile();
+    }
+
+    private void initProfile() {
+        final String username = "";
+        final String password = "123456";
+        final String hostname = "127.0.0.1";
+        final String port = "6379";
+        final String groupId = "group01";
+        final String streamId = "stream01";
+        final String keys = "age,name,sex";
+        final String command = "zscore";
+        final String subOperation = "set,del";
+
+        TaskProfile taskProfile = helper.getTaskProfile(1, "", "csv", false, 
0L, 0L, TaskStateEnum.RUNNING, "D",
+                "GMT+8:00", null);
+        profile = taskProfile.createInstanceProfile("",
+                "", taskProfile.getCycleUnit(), "20240725", 
AgentUtils.getCurrentTime());
+        profile.set(CommonConstants.PROXY_INLONG_GROUP_ID, groupId);
+        profile.set(CommonConstants.PROXY_INLONG_STREAM_ID, streamId);
+        profile.set(TaskConstants.TASK_REDIS_AUTHUSER, username);
+        profile.set(TaskConstants.TASK_REDIS_AUTHPASSWORD, password);
+        profile.set(TaskConstants.TASK_REDIS_HOSTNAME, hostname);
+        profile.set(TaskConstants.TASK_REDIS_PORT, port);
+        profile.set(TaskConstants.TASK_REDIS_COMMAND, command);
+        profile.set(TaskConstants.TASK_REDIS_KEYS, keys);
+        profile.set(TaskConstants.TASK_AUDIT_VERSION, "0");
+        profile.set(TaskConstants.TASK_REDIS_SUBOPERATION, subOperation);
+        profile.setInstanceId(instanceId);
+    }
+
+    @Test
+    public void testJedisStartup() {
+        try {
+            profile.setBoolean(TaskConstants.TASK_REDIS_IS_SUBSCRIBE, false);
+            redisSource.initSource(profile);
+            redisSource.releaseSource();
+        } catch (Exception e) {
+        }
+    }
+
+    @Test
+    public void testReplicatorStartup() {
+        try {
+            profile.setBoolean(TaskConstants.TASK_REDIS_IS_SUBSCRIBE, true);
+            redisSource.initSource(profile);
+            redisSource.releaseSource();
+        } catch (Exception e) {
+            LOGGER.error(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testScheduledExecutorStartup() {
+        try {
+            profile.setBoolean(TaskConstants.TASK_REDIS_IS_SUBSCRIBE, false);
+            redisSource.initSource(profile);
+            verify(executor, 
times(1)).scheduleWithFixedDelay(any(Runnable.class), eq(0L), eq(60 * 1000L),
+                    eq(TimeUnit.MILLISECONDS));
+            redisSource.releaseSource();
+        } catch (Exception e) {
+        }
+    }
+
+    @Test
+    public void testFetchDataByJedis_Get()
+            throws NoSuchMethodException, InvocationTargetException, 
IllegalAccessException {
+        when(jedis.pipelined()).thenReturn(pipeline);
+        when(pipeline.syncAndReturnAll()).thenReturn(Arrays.asList("value1", 
"value2", "value3"));
+
+        List<String> keys = Arrays.asList("key1", "key2", "key3");
+
+        Map<String, Object> expectedData = new HashMap<>();
+        expectedData.put("key1", "value1");
+        expectedData.put("key2", "value2");
+        expectedData.put("key3", "value3");
+        Method method = 
RedisSource.class.getDeclaredMethod("fetchDataByJedis", Jedis.class, 
String.class, List.class,
+                String.class);
+        method.setAccessible(true);
+        Map<String, Object> result = (Map<String, Object>) 
method.invoke(redisSource, jedis, "GET", keys, null);
+        assertEquals(expectedData, result);
+        verify(jedis).pipelined();
+        verify(pipeline, times(3)).get(anyString());
+        verify(pipeline).syncAndReturnAll();
+        executor.shutdown();
+    }
+
+    @Test
+    public void testFetchDataByJedis_Mget()
+            throws NoSuchMethodException, InvocationTargetException, 
IllegalAccessException {
+        when(jedis.mget(eq("key1"), eq("key2"), 
eq("key3"))).thenReturn(Arrays.asList("value1", "value2", "value3"));
+        List<String> keys = Arrays.asList("key1", "key2", "key3");
+        Map<String, Object> expectedData = new HashMap<>();
+        expectedData.put("key1", "value1");
+        expectedData.put("key2", "value2");
+        expectedData.put("key3", "value3");
+        Method method = 
RedisSource.class.getDeclaredMethod("fetchDataByJedis", Jedis.class, 
String.class, List.class,
+                String.class);
+        method.setAccessible(true);
+        Map<String, Object> result = (Map<String, Object>) 
method.invoke(redisSource, jedis, "MGET", keys, null);
+        assertEquals(expectedData, result);
+        verify(jedis).mget(eq("key1"), eq("key2"), eq("key3"));
+        executor.shutdown();
+    }
+
+    @Test
+    public void testFetchDataByJedis_Hget()
+            throws NoSuchMethodException, InvocationTargetException, 
IllegalAccessException {
+        when(jedis.hget("key1", "field1")).thenReturn("hash_value1");
+        when(jedis.hget("key2", "field1")).thenReturn("hash_value2");
+        List<String> keys = Arrays.asList("key1", "key2");
+
+        Map<String, Object> expectedData = new HashMap<>();
+        expectedData.put("key1", "hash_value1");
+        expectedData.put("key2", "hash_value2");
+
+        Method method = 
RedisSource.class.getDeclaredMethod("fetchDataByJedis", Jedis.class, 
String.class, List.class,
+                String.class);
+        method.setAccessible(true);
+        Map<String, Object> result = (Map<String, Object>) 
method.invoke(redisSource, jedis, "HGET", keys, "field1");
+        assertEquals(expectedData, result);
+
+        verify(jedis, times(1)).hget("key1", "field1");
+        verify(jedis, times(1)).hget("key2", "field1");
+        executor.shutdown();
+    }
+
+    @Test
+    public void testFetchDataByJedis_Exists()
+            throws NoSuchMethodException, InvocationTargetException, 
IllegalAccessException {
+        when(jedis.exists("key1")).thenReturn(true);
+        when(jedis.exists("key2")).thenReturn(false);
+
+        List<String> keys = Arrays.asList("key1", "key2");
+
+        Map<String, Object> expectedData = new HashMap<>();
+        expectedData.put("key1", true);
+        expectedData.put("key2", false);
+
+        Method method = 
RedisSource.class.getDeclaredMethod("fetchDataByJedis", Jedis.class, 
String.class, List.class,
+                String.class);
+        method.setAccessible(true);
+        Map<String, Object> result = (Map<String, Object>) 
method.invoke(redisSource, jedis, "EXISTS", keys, null);
+        assertEquals(expectedData, result);
+
+        verify(jedis, times(1)).exists("key1");
+        verify(jedis, times(1)).exists("key2");
+        executor.shutdown();
+    }
+}

Reply via email to