emptyOVO commented on code in PR #11084: URL: https://github.com/apache/inlong/pull/11084#discussion_r1756138334
########## inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/RedisSource.java: ########## @@ -17,54 +17,279 @@ package org.apache.inlong.agent.plugin.sources; +import org.apache.inlong.agent.common.AgentThreadFactory; import org.apache.inlong.agent.conf.InstanceProfile; -import org.apache.inlong.agent.plugin.Message; +import org.apache.inlong.agent.constant.TaskConstants; +import org.apache.inlong.agent.metrics.audit.AuditUtils; import org.apache.inlong.agent.plugin.sources.file.AbstractSource; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.TypeAdapter; +import com.google.gson.stream.JsonReader; +import com.google.gson.stream.JsonWriter; +import com.moilioncircle.redis.replicator.RedisReplicator; +import com.moilioncircle.redis.replicator.Replicator; +import com.moilioncircle.redis.replicator.cmd.CommandName; +import com.moilioncircle.redis.replicator.cmd.impl.DefaultCommand; +import com.moilioncircle.redis.replicator.cmd.parser.DefaultCommandParser; +import com.moilioncircle.redis.replicator.event.PostRdbSyncEvent; +import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueHash; +import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueList; +import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueSet; +import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueString; +import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueZSet; +import com.moilioncircle.redis.replicator.rdb.datatype.KeyValuePair; +import com.moilioncircle.redis.replicator.rdb.datatype.ZSetEntry; +import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.Pipeline; +import redis.clients.jedis.exceptions.JedisConnectionException; +import java.io.IOException; +import java.net.URISyntaxException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; /** * Redis source */ public class RedisSource extends AbstractSource { private static final Logger LOGGER = LoggerFactory.getLogger(RedisSource.class); + private static final long MAX_DATA_SIZE = 500 * 1024; + private static final int REDIS_QUEUE_SIZE = 10000; + private Gson gson; + + public InstanceProfile profile; + private String port; + private Jedis jedis; + private String hostName; + private boolean ssl; + private String authUser; + private String authPassword; + private String readTimeout; + private String replId; + private String snapShot; + private String dbNumber; + private String redisCommand; + + private String fieldOrMember; + private boolean destroyed; + private boolean isSubscribe; + private Set<String> keys; + private Replicator redisReplicator; + private BlockingQueue<SourceData> redisQueue; + private ScheduledExecutorService executor; public RedisSource() { } @Override protected String getThreadName() { - return null; + return "redis-source-" + taskId + "-" + instanceId; } @Override protected void initSource(InstanceProfile profile) { + LOGGER.info("Redis Source init: {}", profile.toJsonStr()); + this.port = profile.get(TaskConstants.TASK_REDIS_PORT); + this.hostName = profile.get(TaskConstants.TASK_REDIS_HOSTNAME); + this.ssl = profile.getBoolean(TaskConstants.TASK_REDIS_SSL, false); + this.authUser = profile.get(TaskConstants.TASK_REDIS_AUTHUSER, ""); + this.authPassword = profile.get(TaskConstants.TASK_REDIS_AUTHPASSWORD, ""); + this.readTimeout = profile.get(TaskConstants.TASK_REDIS_READTIMEOUT, ""); + this.replId = profile.get(TaskConstants.TASK_REDIS_REPLID, ""); + this.snapShot = profile.get(TaskConstants.TASK_REDIS_OFFSET, "-1"); + this.dbNumber = profile.get(TaskConstants.TASK_REDIS_DB_NUMBER, "0"); + this.keys = new ConcurrentSkipListSet<>(Arrays.asList(profile.get(TaskConstants.TASK_REDIS_KEYS).split(","))); + this.isSubscribe = profile.getBoolean(TaskConstants.TASK_REDIS_IS_SUBSCRIBE, false); + this.instanceId = profile.getInstanceId(); + this.redisQueue = new LinkedBlockingQueue<>(REDIS_QUEUE_SIZE); + initGson(); + String uri = getRedisUri(); + try { + if (isSubscribe) { + // use subscribe mode + this.executor = (ScheduledExecutorService) Executors.newSingleThreadExecutor(); + redisReplicator = new RedisReplicator(uri); + initReplicator(); + executor.execute(startReplicatorSync()); + } else { + this.executor = Executors.newScheduledThreadPool(1); + // use command mode + this.redisCommand = profile.get(TaskConstants.TASK_REDIS_COMMAND, "get"); + this.fieldOrMember = profile.get(TaskConstants.TASK_REDIS_FIELD_OR_MEMBER, null); + // default frequency 1min + long syncFreq = profile.getLong(TaskConstants.TASK_REDIS_SYNC_FREQ, 60 * 1000); + this.jedis = new Jedis(uri); + jedis.connect(); + executor.scheduleWithFixedDelay(startJedisSync(), 0, syncFreq, TimeUnit.MILLISECONDS); + } + } catch (URISyntaxException | IOException | JedisConnectionException e) { + sourceMetric.pluginReadFailCount.addAndGet(1); + LOGGER.error("Connect to redis {}:{} failed.", hostName, port, e); + } + } + private Runnable startReplicatorSync() { + return () -> { + AgentThreadFactory.nameThread(getThreadName() + "redis subscribe mode"); + executor.execute(new Thread(() -> { + try { + this.redisReplicator.open(); + } catch (IOException e) { + LOGGER.error("Redis source error, fail to start replicator", e); + } + })); + }; } - @Override - protected void printCurrentState() { + private Runnable startJedisSync() { + return () -> { + AgentThreadFactory.nameThread(getThreadName() + "redis command mode"); + executor.execute(new Thread(() -> { + Map<String, Object> dataMap = + fetchDataByJedis(jedis, redisCommand, new ArrayList<>(keys), fieldOrMember); + synchronizeData(gson.toJson(dataMap)); + })); + }; + } + + private Map<String, Object> fetchDataByJedis(Jedis jedis, String command, List<String> keys, String fieldOrMember) { + Map<String, Object> result = new HashMap<>(); + switch (command.toUpperCase()) { + case "GET": Review Comment: done, now i change the command string to a constant and use reflections to handle -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org