justinwwhuang commented on code in PR #10801:
URL: https://github.com/apache/inlong/pull/10801#discussion_r1735556446


##########
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/RedisSource.java:
##########
@@ -17,54 +17,169 @@
 
 package org.apache.inlong.agent.plugin.sources;
 
+import org.apache.inlong.agent.common.AgentThreadFactory;
 import org.apache.inlong.agent.conf.InstanceProfile;
-import org.apache.inlong.agent.plugin.Message;
+import org.apache.inlong.agent.constant.TaskConstants;
+import org.apache.inlong.agent.metrics.audit.AuditUtils;
 import org.apache.inlong.agent.plugin.sources.file.AbstractSource;
 
+import com.moilioncircle.redis.replicator.RedisReplicator;
+import com.moilioncircle.redis.replicator.Replicator;
+import com.moilioncircle.redis.replicator.event.PostRdbSyncEvent;
+import com.moilioncircle.redis.replicator.rdb.datatype.KeyValuePair;
+import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import redis.clients.jedis.Jedis;
 
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Redis source
  */
 public class RedisSource extends AbstractSource {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(RedisSource.class);
+    public InstanceProfile profile;
+    private String port;
+    private Jedis jedis;
+    private String hostName;
+    private boolean ssl;
+    private String authUser;
+    private String authPassword;
+    private String readTimeout;
+    private String replId;
+    private String snapShot;
+    private String dbNumber;
+    private String redisCommand;
+
+    private String fieldOrMember;
+    private boolean destroyed;
+
+    private Set<String> keys;
+    private Replicator redisReplicator;
+    private BlockingQueue<SourceData> redisQueue;
+    private ExecutorService executor;
 
     public RedisSource() {
 
     }
 
     @Override
     protected String getThreadName() {
-        return null;
+        return "redis-source-" + taskId + "-" + instanceId;
     }
 
     @Override
     protected void initSource(InstanceProfile profile) {
+        LOGGER.info("Redis Source init: {}", profile.toJsonStr());
+        this.port = profile.get(TaskConstants.TASK_REDIS_PORT);
+        this.hostName = profile.get(TaskConstants.TASK_REDIS_HOSTNAME);
+        this.ssl = profile.getBoolean(TaskConstants.TASK_REDIS_SSL, false);
+        this.authUser = profile.get(TaskConstants.TASK_REDIS_AUTHUSER, "");
+        this.authPassword = profile.get(TaskConstants.TASK_REDIS_AUTHPASSWORD, 
"");
+        this.readTimeout = profile.get(TaskConstants.TASK_REDIS_READTIMEOUT, 
"");
+        this.replId = profile.get(TaskConstants.TASK_REDIS_REPLID, "");
+        this.snapShot = profile.get(TaskConstants.TASK_REDIS_OFFSET, "-1");
+        this.dbNumber = profile.get(TaskConstants.TASK_REDIS_DB_NUMBER, "0");
+        this.redisCommand = profile.get(TaskConstants.TASK_REDIS_COMMAND, 
"get");
+        this.keys = new 
ConcurrentSkipListSet<>(Arrays.asList(profile.get(TaskConstants.TASK_REDIS_KEYS).split(",")));
+        this.fieldOrMember = 
profile.get(TaskConstants.TASK_REDIS_FIELD_OR_NUMBER, null);
+        this.instanceId = profile.getInstanceId();
+        this.redisQueue = new 
LinkedBlockingQueue<>(profile.getInt(TaskConstants.TASK_REDIS_QUEUE_SIZE, 
10000));
+        String uri = getRedisUri();
+        this.jedis = new Jedis(uri);
+        try {
+            redisReplicator = new RedisReplicator(uri);
+            startJedisSynchronize();
+            initReplicator();
+            executor = Executors.newSingleThreadExecutor();
+            executor.execute(startRedisReplicator());
+        } catch (URISyntaxException | IOException e) {
+            sourceMetric.pluginReadFailCount.addAndGet(1);
+            LOGGER.error("Connect to redis {}:{} failed.", hostName, port);
+        }
+    }
 
+    private void startJedisSynchronize() {
+        for (String key : keys) {

Review Comment:
   Can data acquisition be done on a scheduled basis instead of just once.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to