emptyOVO commented on code in PR #11084:
URL: https://github.com/apache/inlong/pull/11084#discussion_r1756138334


##########
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/RedisSource.java:
##########
@@ -17,54 +17,279 @@
 
 package org.apache.inlong.agent.plugin.sources;
 
+import org.apache.inlong.agent.common.AgentThreadFactory;
 import org.apache.inlong.agent.conf.InstanceProfile;
-import org.apache.inlong.agent.plugin.Message;
+import org.apache.inlong.agent.constant.TaskConstants;
+import org.apache.inlong.agent.metrics.audit.AuditUtils;
 import org.apache.inlong.agent.plugin.sources.file.AbstractSource;
 
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.TypeAdapter;
+import com.google.gson.stream.JsonReader;
+import com.google.gson.stream.JsonWriter;
+import com.moilioncircle.redis.replicator.RedisReplicator;
+import com.moilioncircle.redis.replicator.Replicator;
+import com.moilioncircle.redis.replicator.cmd.CommandName;
+import com.moilioncircle.redis.replicator.cmd.impl.DefaultCommand;
+import com.moilioncircle.redis.replicator.cmd.parser.DefaultCommandParser;
+import com.moilioncircle.redis.replicator.event.PostRdbSyncEvent;
+import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueHash;
+import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueList;
+import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueSet;
+import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueString;
+import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueZSet;
+import com.moilioncircle.redis.replicator.rdb.datatype.KeyValuePair;
+import com.moilioncircle.redis.replicator.rdb.datatype.ZSetEntry;
+import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.Pipeline;
+import redis.clients.jedis.exceptions.JedisConnectionException;
 
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Redis source
  */
 public class RedisSource extends AbstractSource {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(RedisSource.class);
+    private static final long MAX_DATA_SIZE = 500 * 1024;
+    private static final int REDIS_QUEUE_SIZE = 10000;
+    private Gson gson;
+
+    public InstanceProfile profile;
+    private String port;
+    private Jedis jedis;
+    private String hostName;
+    private boolean ssl;
+    private String authUser;
+    private String authPassword;
+    private String readTimeout;
+    private String replId;
+    private String snapShot;
+    private String dbNumber;
+    private String redisCommand;
+
+    private String fieldOrMember;
+    private boolean destroyed;
+    private boolean isSubscribe;
+    private Set<String> keys;
+    private Replicator redisReplicator;
+    private BlockingQueue<SourceData> redisQueue;
+    private ScheduledExecutorService executor;
 
     public RedisSource() {
 
     }
 
     @Override
     protected String getThreadName() {
-        return null;
+        return "redis-source-" + taskId + "-" + instanceId;
     }
 
     @Override
     protected void initSource(InstanceProfile profile) {
+        LOGGER.info("Redis Source init: {}", profile.toJsonStr());
+        this.port = profile.get(TaskConstants.TASK_REDIS_PORT);
+        this.hostName = profile.get(TaskConstants.TASK_REDIS_HOSTNAME);
+        this.ssl = profile.getBoolean(TaskConstants.TASK_REDIS_SSL, false);
+        this.authUser = profile.get(TaskConstants.TASK_REDIS_AUTHUSER, "");
+        this.authPassword = profile.get(TaskConstants.TASK_REDIS_AUTHPASSWORD, 
"");
+        this.readTimeout = profile.get(TaskConstants.TASK_REDIS_READTIMEOUT, 
"");
+        this.replId = profile.get(TaskConstants.TASK_REDIS_REPLID, "");
+        this.snapShot = profile.get(TaskConstants.TASK_REDIS_OFFSET, "-1");
+        this.dbNumber = profile.get(TaskConstants.TASK_REDIS_DB_NUMBER, "0");
+        this.keys = new 
ConcurrentSkipListSet<>(Arrays.asList(profile.get(TaskConstants.TASK_REDIS_KEYS).split(",")));
+        this.isSubscribe = 
profile.getBoolean(TaskConstants.TASK_REDIS_IS_SUBSCRIBE, false);
+        this.instanceId = profile.getInstanceId();
+        this.redisQueue = new LinkedBlockingQueue<>(REDIS_QUEUE_SIZE);
+        initGson();
+        String uri = getRedisUri();
+        try {
+            if (isSubscribe) {
+                // use subscribe mode
+                this.executor = (ScheduledExecutorService) 
Executors.newSingleThreadExecutor();
+                redisReplicator = new RedisReplicator(uri);
+                initReplicator();
+                executor.execute(startReplicatorSync());
+            } else {
+                this.executor = Executors.newScheduledThreadPool(1);
+                // use command mode
+                this.redisCommand = 
profile.get(TaskConstants.TASK_REDIS_COMMAND, "get");
+                this.fieldOrMember = 
profile.get(TaskConstants.TASK_REDIS_FIELD_OR_MEMBER, null);
+                // default frequency 1min
+                long syncFreq = 
profile.getLong(TaskConstants.TASK_REDIS_SYNC_FREQ, 60 * 1000);
+                this.jedis = new Jedis(uri);
+                jedis.connect();
+                executor.scheduleWithFixedDelay(startJedisSync(), 0, syncFreq, 
TimeUnit.MILLISECONDS);
+            }
+        } catch (URISyntaxException | IOException | JedisConnectionException 
e) {
+            sourceMetric.pluginReadFailCount.addAndGet(1);
+            LOGGER.error("Connect to redis {}:{} failed.", hostName, port, e);
+        }
+    }
 
+    private Runnable startReplicatorSync() {
+        return () -> {
+            AgentThreadFactory.nameThread(getThreadName() + "redis subscribe 
mode");
+            executor.execute(new Thread(() -> {
+                try {
+                    this.redisReplicator.open();
+                } catch (IOException e) {
+                    LOGGER.error("Redis source error, fail to start 
replicator", e);
+                }
+            }));
+        };
     }
 
-    @Override
-    protected void printCurrentState() {
+    private Runnable startJedisSync() {
+        return () -> {
+            AgentThreadFactory.nameThread(getThreadName() + "redis command 
mode");
+            executor.execute(new Thread(() -> {
+                Map<String, Object> dataMap =
+                        fetchDataByJedis(jedis, redisCommand, new 
ArrayList<>(keys), fieldOrMember);
+                synchronizeData(gson.toJson(dataMap));
+            }));
+        };
+    }
+
+    private Map<String, Object> fetchDataByJedis(Jedis jedis, String command, 
List<String> keys, String fieldOrMember) {
+        Map<String, Object> result = new HashMap<>();
+        switch (command.toUpperCase()) {
+            case "GET":

Review Comment:
   done, now i change the command string to a constant and use reflections to 
handle



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to