This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new cc82864873 [INLONG-10287][Agent] Update the Redis Source (#11084) cc82864873 is described below commit cc828648738e60e7796546a95bb95f94d2e90844 Author: emptyOVO <118812562+empty...@users.noreply.github.com> AuthorDate: Tue Oct 8 18:58:53 2024 +0800 [INLONG-10287][Agent] Update the Redis Source (#11084) --- .../inlong/agent/constant/TaskConstants.java | 16 + .../org/apache/inlong/agent/pojo/RedisTask.java | 15 +- .../apache/inlong/agent/pojo/TaskProfileDto.java | 10 +- inlong-agent/agent-plugins/pom.xml | 4 + .../agent/plugin/instance/RedisInstance.java} | 31 +- .../inlong/agent/plugin/sources/RedisSource.java | 592 ++++++++++++++++++++- .../apache/inlong/agent/plugin/task/RedisTask.java | 78 +++ .../agent/plugin/sources/TestRedisSource.java | 259 +++++++++ 8 files changed, 967 insertions(+), 38 deletions(-) diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java index 9398d70640..fbf0b5b705 100755 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java @@ -178,6 +178,22 @@ public class TaskConstants extends CommonConstants { public static final String TASK_SQLSERVER_UNIX_TIMESTAMP_FORMAT_ENABLE = "task.sqlserverTask.unixTimestampFormatEnable"; + public static final String TASK_REDIS_PORT = "task.redisTask.port"; + public static final String TASK_REDIS_HOSTNAME = "task.redisTask.hostname"; + public static final String TASK_REDIS_SSL = "task.redisTask.ssl"; + public static final String TASK_REDIS_AUTHUSER = "task.redisTask.authUser"; + public static final String TASK_REDIS_AUTHPASSWORD = "task.redisTask.authPassword"; + public static final String TASK_REDIS_READTIMEOUT = "task.redisTask.readTimeout"; + public static final String TASK_REDIS_REPLID = "task.redisTask.replId"; + public static final String TASK_REDIS_OFFSET = "task.redisTask.offset"; + public static final String TASK_REDIS_DB_NUMBER = "task.redisTask.dbNumber"; + public static final String TASK_REDIS_COMMAND = "task.redisTask.command"; + public static final String TASK_REDIS_KEYS = "task.redisTask.keys"; + public static final String TASK_REDIS_FIELD_OR_MEMBER = "task.redisTask.fieldOrMember"; + public static final String TASK_REDIS_IS_SUBSCRIBE = "task.redisTask.isSubscribe"; + public static final String TASK_REDIS_SUBOPERATION = "task.redisTask.subOperation"; + public static final String TASK_REDIS_SYNC_FREQ = "task.redisTask.syncFreq"; + public static final String TASK_STATE = "task.state"; public static final String INSTANCE_STATE = "instance.state"; diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/RedisTask.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/RedisTask.java index 701bea2b8e..2b8a5c4ca6 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/RedisTask.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/RedisTask.java @@ -18,7 +18,6 @@ package org.apache.inlong.agent.pojo; import lombok.Data; - @Data public class RedisTask { @@ -30,6 +29,13 @@ public class RedisTask { private String readTimeout; private String queueSize; private String replId; + private String dbNumber; + private String command; + private String keys; + private String fieldOrMember; + private Boolean isSubscribe; + private String syncFreq; + private String subOperations; @Data public static class RedisTaskConfig { @@ -42,5 +48,12 @@ public class RedisTask { private String timeout; private String queueSize; private String replId; + private String dbNumber; + private String command; + private String keys; + private String fieldOrMember; + private Boolean isSubscribe; + private String syncFreq; + private String subOperations; } } diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java index bb602e2c61..1558bc042f 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java @@ -53,6 +53,7 @@ public class TaskProfileDto { public static final String DEFAULT_PULSAR_TASK = "org.apache.inlong.agent.plugin.task.PulsarTask"; public static final String DEFAULT_MONGODB_TASK = "org.apache.inlong.agent.plugin.task.MongoDBTask"; public static final String DEFAULT_ORACLE_TASK = "org.apache.inlong.agent.plugin.task.OracleTask"; + public static final String DEFAULT_REDIS_TASK = "org.apache.inlong.agent.plugin.task.RedisTask"; public static final String DEFAULT_POSTGRESQL_TASK = "org.apache.inlong.agent.plugin.task.PostgreSQLTask"; public static final String DEFAULT_MQTT_TASK = "org.apache.inlong.agent.plugin.task.MqttTask"; public static final String DEFAULT_SQLSERVER_TASK = "org.apache.inlong.agent.plugin.task.SQLServerTask"; @@ -274,8 +275,14 @@ public class TaskProfileDto { redisTask.setPort(config.getPort()); redisTask.setSsl(config.getSsl()); redisTask.setReadTimeout(config.getTimeout()); - redisTask.setQueueSize(config.getQueueSize()); redisTask.setReplId(config.getReplId()); + redisTask.setCommand(config.getCommand()); + redisTask.setDbNumber(config.getDbNumber()); + redisTask.setKeys(config.getKeys()); + redisTask.setFieldOrMember(config.getFieldOrMember()); + redisTask.setIsSubscribe(config.getIsSubscribe()); + redisTask.setSyncFreq(config.getSyncFreq()); + redisTask.setSubOperations(config.getSubOperations()); return redisTask; } @@ -521,6 +528,7 @@ public class TaskProfileDto { profileDto.setTask(task); break; case REDIS: + task.setTaskClass(DEFAULT_REDIS_TASK); RedisTask redisTask = getRedisTask(dataConfig); task.setRedisTask(redisTask); task.setSource(REDIS_SOURCE); diff --git a/inlong-agent/agent-plugins/pom.xml b/inlong-agent/agent-plugins/pom.xml index eb092bfed0..750141798b 100644 --- a/inlong-agent/agent-plugins/pom.xml +++ b/inlong-agent/agent-plugins/pom.xml @@ -42,6 +42,10 @@ <artifactId>ojdbc8</artifactId> <version>${ojdbc.version}</version> </dependency> + <dependency> + <groupId>redis.clients</groupId> + <artifactId>jedis</artifactId> + </dependency> <dependency> <groupId>org.apache.inlong</groupId> <artifactId>agent-common</artifactId> diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/RedisTask.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/RedisInstance.java similarity index 56% copy from inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/RedisTask.java copy to inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/RedisInstance.java index 701bea2b8e..f34c3db923 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/RedisTask.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/RedisInstance.java @@ -15,32 +15,15 @@ * limitations under the License. */ -package org.apache.inlong.agent.pojo; +package org.apache.inlong.agent.plugin.instance; -import lombok.Data; +import org.apache.inlong.agent.conf.InstanceProfile; +import org.apache.inlong.agent.constant.TaskConstants; -@Data -public class RedisTask { +public class RedisInstance extends CommonInstance { - private String authUser; - private String authPassword; - private String hostname; - private String port; - private Boolean ssl; - private String readTimeout; - private String queueSize; - private String replId; - - @Data - public static class RedisTaskConfig { - - private String username; - private String password; - private String hostname; - private String port; - private Boolean ssl; - private String timeout; - private String queueSize; - private String replId; + @Override + public void setInodeInfo(InstanceProfile profile) { + profile.set(TaskConstants.INODE_INFO, ""); } } diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/RedisSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/RedisSource.java index b7fea66e4c..aada1bf504 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/RedisSource.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/RedisSource.java @@ -17,14 +17,53 @@ package org.apache.inlong.agent.plugin.sources; +import org.apache.inlong.agent.common.AgentThreadFactory; import org.apache.inlong.agent.conf.InstanceProfile; -import org.apache.inlong.agent.plugin.Message; +import org.apache.inlong.agent.constant.TaskConstants; +import org.apache.inlong.agent.metrics.audit.AuditUtils; import org.apache.inlong.agent.plugin.sources.file.AbstractSource; +import com.google.common.collect.Maps; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.TypeAdapter; +import com.google.gson.stream.JsonReader; +import com.google.gson.stream.JsonWriter; +import com.moilioncircle.redis.replicator.RedisReplicator; +import com.moilioncircle.redis.replicator.Replicator; +import com.moilioncircle.redis.replicator.cmd.CommandName; +import com.moilioncircle.redis.replicator.cmd.impl.DefaultCommand; +import com.moilioncircle.redis.replicator.cmd.parser.DefaultCommandParser; +import com.moilioncircle.redis.replicator.event.PostRdbSyncEvent; +import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueHash; +import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueList; +import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueSet; +import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueString; +import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueZSet; +import com.moilioncircle.redis.replicator.rdb.datatype.KeyValuePair; +import com.moilioncircle.redis.replicator.rdb.datatype.ZSetEntry; +import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.Pipeline; +import redis.clients.jedis.exceptions.JedisConnectionException; +import java.io.IOException; +import java.net.URISyntaxException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; /** * Redis source @@ -32,6 +71,50 @@ import java.util.List; public class RedisSource extends AbstractSource { private static final Logger LOGGER = LoggerFactory.getLogger(RedisSource.class); + private static final long MAX_DATA_SIZE = 500 * 1024; + private static final int REDIS_QUEUE_SIZE = 10000; + private static final long DEFAULT_FREQ = 60 * 1000; + private static final String GET_COMMAND = "GET"; + private static final String MGET_COMMAND = "MGET"; + private static final String HGET_COMMAND = "HGET"; + private static final String ZSCORE_COMMAND = "ZSCORE"; + private static final String ZREVRANK_COMMAND = "ZREVRANK"; + private static final String EXISTS_COMMAND = "EXISTS"; + private Gson gson; + + public InstanceProfile profile; + private String port; + private Jedis jedis; + private String hostName; + private boolean ssl; + private String authUser; + private String authPassword; + private String readTimeout; + private String replId; + private String snapShot; + private String dbNumber; + private String redisCommand; + + private String fieldOrMember; + private boolean destroyed; + private boolean isSubscribe; + private Set<String> keys; + private Set<String> subOperations; + private Replicator redisReplicator; + private BlockingQueue<SourceData> redisQueue; + private ScheduledExecutorService executor; + + // Command handler map + private static final Map<String, CommandHandler> commandHandlers = Maps.newConcurrentMap(); + + static { + commandHandlers.put(GET_COMMAND, RedisSource::handleGet); + commandHandlers.put(MGET_COMMAND, RedisSource::handleMGet); + commandHandlers.put(HGET_COMMAND, RedisSource::handleHGet); + commandHandlers.put(ZSCORE_COMMAND, RedisSource::handleZScore); + commandHandlers.put(ZREVRANK_COMMAND, RedisSource::handleZRevRank); + commandHandlers.put(EXISTS_COMMAND, RedisSource::handleExists); + } public RedisSource() { @@ -39,32 +122,207 @@ public class RedisSource extends AbstractSource { @Override protected String getThreadName() { - return null; + return "redis-source-" + taskId + "-" + instanceId; } @Override protected void initSource(InstanceProfile profile) { + LOGGER.info("Redis Source init: {}", profile.toJsonStr()); + this.port = profile.get(TaskConstants.TASK_REDIS_PORT); + this.hostName = profile.get(TaskConstants.TASK_REDIS_HOSTNAME); + this.ssl = profile.getBoolean(TaskConstants.TASK_REDIS_SSL, false); + this.authUser = profile.get(TaskConstants.TASK_REDIS_AUTHUSER, ""); + this.authPassword = profile.get(TaskConstants.TASK_REDIS_AUTHPASSWORD, ""); + this.readTimeout = profile.get(TaskConstants.TASK_REDIS_READTIMEOUT, ""); + this.replId = profile.get(TaskConstants.TASK_REDIS_REPLID, ""); + this.snapShot = profile.get(TaskConstants.TASK_REDIS_OFFSET, "-1"); + this.dbNumber = profile.get(TaskConstants.TASK_REDIS_DB_NUMBER, "0"); + this.keys = new ConcurrentSkipListSet<>(Arrays.asList(profile.get(TaskConstants.TASK_REDIS_KEYS).split(","))); + this.isSubscribe = profile.getBoolean(TaskConstants.TASK_REDIS_IS_SUBSCRIBE, false); + this.instanceId = profile.getInstanceId(); + this.redisQueue = new LinkedBlockingQueue<>(REDIS_QUEUE_SIZE); + initGson(); + String uri = getRedisUri(); + try { + if (isSubscribe) { + // use subscribe mode + this.subOperations = new ConcurrentSkipListSet<>( + Arrays.asList(profile.get(TaskConstants.TASK_REDIS_SUBOPERATION).split(","))); + this.executor = (ScheduledExecutorService) Executors.newSingleThreadExecutor(); + this.redisReplicator = new RedisReplicator(uri); + initReplicator(); + this.executor.execute(startReplicatorSync()); + } else { + this.executor = Executors.newScheduledThreadPool(1); + // use command mode + this.redisCommand = profile.get(TaskConstants.TASK_REDIS_COMMAND, GET_COMMAND); + this.fieldOrMember = profile.get(TaskConstants.TASK_REDIS_FIELD_OR_MEMBER, null); + // default frequency 1min + long syncFreq = profile.getLong(TaskConstants.TASK_REDIS_SYNC_FREQ, DEFAULT_FREQ); + this.jedis = new Jedis(uri); + jedis.connect(); + this.executor.scheduleWithFixedDelay(startJedisSync(), 0, syncFreq, TimeUnit.MILLISECONDS); + } + } catch (URISyntaxException | IOException | JedisConnectionException e) { + sourceMetric.pluginReadFailCount.addAndGet(1); + LOGGER.error("Connect to redis {}:{} failed.", hostName, port, e); + } + } + private Runnable startReplicatorSync() { + return () -> { + AgentThreadFactory.nameThread(getThreadName() + "redis subscribe mode"); + executor.execute(new Thread(() -> { + try { + this.redisReplicator.open(); + } catch (IOException e) { + LOGGER.error("Redis source error, fail to start replicator", e); + } + })); + }; } - @Override - protected void printCurrentState() { + private Runnable startJedisSync() { + return () -> { + AgentThreadFactory.nameThread(getThreadName() + "redis command mode"); + executor.execute(new Thread(() -> { + Map<String, Object> dataMap = + fetchDataByJedis(jedis, redisCommand, new ArrayList<>(keys), fieldOrMember); + synchronizeData(gson.toJson(dataMap)); + })); + }; + } + + private Map<String, Object> fetchDataByJedis(Jedis jedis, String command, List<String> keys, String fieldOrMember) { + Map<String, Object> result = new HashMap<>(); + CommandHandler handler = commandHandlers.get(command.toUpperCase()); + if (handler != null) { + handler.handle(jedis, keys, fieldOrMember, result); + } else { + LOGGER.error("Unsupported command: " + command); + throw new UnsupportedOperationException("Unsupported command: " + command); + } + return result; + } + private static void handleGet(Jedis jedis, List<String> keys, String fieldOrMember, Map<String, Object> result) { + Pipeline pipeline = jedis.pipelined(); + for (String key : keys) { + pipeline.get(key); + } + List<Object> getValues = pipeline.syncAndReturnAll(); + for (int i = 0; i < keys.size(); i++) { + result.put(keys.get(i), getValues.get(i)); + } + } + + private static void handleMGet(Jedis jedis, List<String> keys, String fieldOrMember, Map<String, Object> result) { + List<String> mGetValues = jedis.mget(keys.toArray(new String[0])); + for (int i = 0; i < keys.size(); i++) { + result.put(keys.get(i), mGetValues.get(i)); + } + } + + private static void handleHGet(Jedis jedis, List<String> keys, String fieldOrMember, Map<String, Object> result) { + for (String key : keys) { + String value = jedis.hget(key, fieldOrMember); + result.put(key, value); + } + } + + private static void handleZScore(Jedis jedis, List<String> keys, String fieldOrMember, Map<String, Object> result) { + for (String key : keys) { + if (!StringUtils.isEmpty(fieldOrMember)) { + Double score = jedis.zscore(key, fieldOrMember); + result.put(key, score); + } + } + } + + private static void handleZRevRank(Jedis jedis, List<String> keys, String fieldOrMember, + Map<String, Object> result) { + for (String key : keys) { + if (!StringUtils.isEmpty(fieldOrMember)) { + Long rank = jedis.zrevrank(key, fieldOrMember); + result.put(key, rank); + } + } + } + + private static void handleExists(Jedis jedis, List<String> keys, String fieldOrMember, Map<String, Object> result) { + for (String key : keys) { + boolean exists = jedis.exists(key); + result.put(key, exists); + } + } + + // Functional interface for handling commands + @FunctionalInterface + private interface CommandHandler { + + void handle(Jedis jedis, List<String> keys, String fieldOrMember, Map<String, Object> result); + } + + private void synchronizeData(String data) { + try { + if (!StringUtils.isEmpty(data)) { + byte[] dataBytes = data.getBytes(StandardCharsets.UTF_8); + // limit data size + if (dataBytes.length <= MAX_DATA_SIZE) { + SourceData sourceData = new SourceData(dataBytes, "0L"); + boolean offerSuc = false; + while (isRunnable() && !offerSuc) { + offerSuc = redisQueue.offer(sourceData, 1, TimeUnit.SECONDS); + } + AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId, inlongStreamId, + System.currentTimeMillis(), 1, data.length()); + sourceMetric.pluginReadCount.incrementAndGet(); + } else { + sourceMetric.pluginReadFailCount.incrementAndGet(); + LOGGER.warn("Read redis data warn, data overload, Automatically skip and discard"); + } + } + } catch (InterruptedException e) { + sourceMetric.pluginReadFailCount.incrementAndGet(); + LOGGER.error("Read redis data error", e); + } } @Override - protected boolean doPrepareToRead() { - return false; + protected void printCurrentState() { + if (isSubscribe) { + LOGGER.info("redis subscribe synchronization is {} on source {}", + redisReplicator != null && !executor.isShutdown() ? "running" : "stop", + hostName + ":" + port); + } else { + LOGGER.info("redis command synchronization is {} on source {}", !executor.isShutdown() ? "running" : "stop", + hostName + ":" + port); + } } @Override - protected List<SourceData> readFromSource() { - return null; + protected boolean doPrepareToRead() { + return true; } @Override - public Message read() { - return null; + protected List<SourceData> readFromSource() { + List<SourceData> dataList = new ArrayList<>(); + try { + int size = 0; + while (size < BATCH_READ_LINE_TOTAL_LEN) { + SourceData sourceData = redisQueue.poll(1, TimeUnit.SECONDS); + if (sourceData != null) { + size += sourceData.getData().length; + dataList.add(sourceData); + } else { + break; + } + } + } catch (InterruptedException e) { + LOGGER.error("poll {} data from redis queue interrupted.", instanceId); + } + return dataList; } @Override @@ -74,7 +332,23 @@ public class RedisSource extends AbstractSource { @Override protected void releaseSource() { - + LOGGER.info("releasing redis source"); + if (!destroyed) { + try { + executor.shutdown(); + // subscribe mode then close replicator + if (redisReplicator != null) { + redisReplicator.close(); + } + // command mode then close jedis + if (jedis.isConnected()) { + jedis.close(); + } + } catch (IOException e) { + LOGGER.error("Redis reader close failed."); + } + destroyed = true; + } } @Override @@ -84,6 +358,300 @@ public class RedisSource extends AbstractSource { @Override public boolean sourceExist() { - return false; + return true; + } + + private String getRedisUri() { + StringBuffer sb = new StringBuffer("redis://"); + sb.append(hostName).append(":").append(port); + if (!StringUtils.isEmpty(dbNumber)) { + sb.append("/").append(dbNumber); + } + sb.append("?"); + if (!StringUtils.isEmpty(authPassword)) { + sb.append("authPassword=").append(authPassword).append("&"); + } + if (!StringUtils.isEmpty(authUser)) { + sb.append("authUser=").append(authUser).append("&"); + } + if (!StringUtils.isEmpty(readTimeout)) { + sb.append("readTimeout=").append(readTimeout).append("&"); + } + if (ssl) { + sb.append("ssl=").append("yes").append("&"); + } + if (!StringUtils.isEmpty(snapShot)) { + sb.append("replOffset=").append(snapShot).append("&"); + } + if (!StringUtils.isEmpty(replId)) { + sb.append("replId=").append(replId).append("&"); + } + if (sb.charAt(sb.length() - 1) == '?' || sb.charAt(sb.length() - 1) == '&') { + sb.deleteCharAt(sb.length() - 1); + } + return sb.toString(); + } + + private void initReplicator() { + if (!subOperations.isEmpty()) { + DefaultCommandParser replicatorCommandParser = new DefaultCommandParser(); + for (String subOperation : subOperations) { + this.redisReplicator.addCommandParser(CommandName.name(subOperation), replicatorCommandParser); + } + this.redisReplicator.addEventListener((replicator, event) -> { + if (event instanceof DefaultCommand) { + DefaultCommand defaultCommand = (DefaultCommand) event; + Object[] args = defaultCommand.getArgs(); + if (args[0] instanceof byte[]) { + String key = new String((byte[]) args[0], StandardCharsets.UTF_8); + if (keys.contains(key)) { + synchronizeData(gson.toJson(event)); + } + } + } + if (event instanceof PostRdbSyncEvent) { + this.snapShot = String.valueOf(replicator.getConfiguration().getReplOffset()); + LOGGER.info("after rdb snapShot is: {}", snapShot); + } + }); + } else { + // if SubOperation is not configured, subscribe all modification + initDefaultReplicator(); + } + } + + private void initDefaultReplicator() { + DefaultCommandParser defaultCommandParser = new DefaultCommandParser(); + this.redisReplicator.addCommandParser(CommandName.name("APPEND"), defaultCommandParser); + this.redisReplicator.addCommandParser(CommandName.name("SET"), defaultCommandParser); + this.redisReplicator.addCommandParser(CommandName.name("SETEX"), defaultCommandParser); + this.redisReplicator.addCommandParser(CommandName.name("MSET"), defaultCommandParser); + this.redisReplicator.addCommandParser(CommandName.name("DEL"), defaultCommandParser); + this.redisReplicator.addCommandParser(CommandName.name("SADD"), defaultCommandParser); + this.redisReplicator.addCommandParser(CommandName.name("HMSET"), defaultCommandParser); + this.redisReplicator.addCommandParser(CommandName.name("HSET"), defaultCommandParser); + this.redisReplicator.addCommandParser(CommandName.name("LSET"), defaultCommandParser); + this.redisReplicator.addCommandParser(CommandName.name("EXPIRE"), defaultCommandParser); + this.redisReplicator.addCommandParser(CommandName.name("EXPIREAT"), defaultCommandParser); + this.redisReplicator.addCommandParser(CommandName.name("GETSET"), defaultCommandParser); + this.redisReplicator.addCommandParser(CommandName.name("HSETNX"), defaultCommandParser); + this.redisReplicator.addCommandParser(CommandName.name("PSETEX"), defaultCommandParser); + this.redisReplicator.addCommandParser(CommandName.name("SETNX"), defaultCommandParser); + this.redisReplicator.addCommandParser(CommandName.name("SETRANGE"), defaultCommandParser); + this.redisReplicator.addCommandParser(CommandName.name("HDEL"), defaultCommandParser); + this.redisReplicator.addCommandParser(CommandName.name("LPOP"), defaultCommandParser); + this.redisReplicator.addCommandParser(CommandName.name("LPUSH"), defaultCommandParser); + this.redisReplicator.addCommandParser(CommandName.name("LPUSHX"), defaultCommandParser); + this.redisReplicator.addCommandParser(CommandName.name("LRem"), defaultCommandParser); + this.redisReplicator.addCommandParser(CommandName.name("RPOP"), defaultCommandParser); + this.redisReplicator.addCommandParser(CommandName.name("RPUSH"), defaultCommandParser); + this.redisReplicator.addCommandParser(CommandName.name("RPUSHX"), defaultCommandParser); + this.redisReplicator.addCommandParser(CommandName.name("ZREM"), defaultCommandParser); + this.redisReplicator.addCommandParser(CommandName.name("RENAME"), defaultCommandParser); + this.redisReplicator.addCommandParser(CommandName.name("INCR"), defaultCommandParser); + this.redisReplicator.addCommandParser(CommandName.name("DECR"), defaultCommandParser); + this.redisReplicator.addCommandParser(CommandName.name("INCRBY"), defaultCommandParser); + this.redisReplicator.addCommandParser(CommandName.name("DECRBY"), defaultCommandParser); + this.redisReplicator.addCommandParser(CommandName.name("PERSIST"), defaultCommandParser); + this.redisReplicator.addCommandParser(CommandName.name("SELECT"), defaultCommandParser); + this.redisReplicator.addCommandParser(CommandName.name("FLUSHALL"), defaultCommandParser); + this.redisReplicator.addCommandParser(CommandName.name("FLUSHDB"), defaultCommandParser); + this.redisReplicator.addCommandParser(CommandName.name("HINCRBY"), defaultCommandParser); + this.redisReplicator.addCommandParser(CommandName.name("ZINCRBY"), defaultCommandParser); + this.redisReplicator.addCommandParser(CommandName.name("MOVE"), defaultCommandParser); + this.redisReplicator.addCommandParser(CommandName.name("SMOVE"), defaultCommandParser); + this.redisReplicator.addCommandParser(CommandName.name("PFADD"), defaultCommandParser); + this.redisReplicator.addCommandParser(CommandName.name("PFCOUNT"), defaultCommandParser); + this.redisReplicator.addCommandParser(CommandName.name("PFMERGE"), defaultCommandParser); + this.redisReplicator.addCommandParser(CommandName.name("SDIFFSTORE"), defaultCommandParser); + this.redisReplicator.addCommandParser(CommandName.name("SINTERSTORE"), defaultCommandParser); + this.redisReplicator.addCommandParser(CommandName.name("SUNIONSTORE"), defaultCommandParser); + this.redisReplicator.addCommandParser(CommandName.name("ZADD"), defaultCommandParser); + this.redisReplicator.addCommandParser(CommandName.name("ZINTERSTORE"), defaultCommandParser); + this.redisReplicator.addCommandParser(CommandName.name("ZUNIONSTORE"), defaultCommandParser); + this.redisReplicator.addCommandParser(CommandName.name("BRPOPLPUSH"), defaultCommandParser); + this.redisReplicator.addCommandParser(CommandName.name("LINSERT"), defaultCommandParser); + this.redisReplicator.addCommandParser(CommandName.name("RENAMENX"), defaultCommandParser); + this.redisReplicator.addCommandParser(CommandName.name("RESTORE"), defaultCommandParser); + this.redisReplicator.addCommandParser(CommandName.name("PEXPIRE"), defaultCommandParser); + this.redisReplicator.addCommandParser(CommandName.name("PEXPIREAT"), defaultCommandParser); + this.redisReplicator.addCommandParser(CommandName.name("GEOADD"), defaultCommandParser); + this.redisReplicator.addCommandParser(CommandName.name("EVAL"), defaultCommandParser); + this.redisReplicator.addCommandParser(CommandName.name("EVALSHA"), defaultCommandParser); + this.redisReplicator.addCommandParser(CommandName.name("SCRIPT"), defaultCommandParser); + this.redisReplicator.addCommandParser(CommandName.name("PUBLISH"), defaultCommandParser); + this.redisReplicator.addCommandParser(CommandName.name("BITOP"), defaultCommandParser); + this.redisReplicator.addCommandParser(CommandName.name("BITFIELD"), defaultCommandParser); + this.redisReplicator.addCommandParser(CommandName.name("SETBIT"), defaultCommandParser); + this.redisReplicator.addCommandParser(CommandName.name("SREM"), defaultCommandParser); + this.redisReplicator.addCommandParser(CommandName.name("UNLINK"), defaultCommandParser); + this.redisReplicator.addCommandParser(CommandName.name("SWAPDB"), defaultCommandParser); + this.redisReplicator.addCommandParser(CommandName.name("MULTI"), defaultCommandParser); + this.redisReplicator.addCommandParser(CommandName.name("EXEC"), defaultCommandParser); + this.redisReplicator.addCommandParser(CommandName.name("ZREMRANGEBYSCORE"), defaultCommandParser); + this.redisReplicator.addCommandParser(CommandName.name("ZREMRANGEBYRANK"), defaultCommandParser); + this.redisReplicator.addCommandParser(CommandName.name("ZREMRANGEBYLEX"), defaultCommandParser); + this.redisReplicator.addCommandParser(CommandName.name("LTRIM"), defaultCommandParser); + this.redisReplicator.addCommandParser(CommandName.name("SORT"), defaultCommandParser); + this.redisReplicator.addCommandParser(CommandName.name("RPOPLPUSH"), defaultCommandParser); + this.redisReplicator.addCommandParser(CommandName.name("ZPOPMIN"), defaultCommandParser); + this.redisReplicator.addCommandParser(CommandName.name("ZPOPMAX"), defaultCommandParser); + this.redisReplicator.addCommandParser(CommandName.name("REPLCONF"), defaultCommandParser); + this.redisReplicator.addCommandParser(CommandName.name("XACK"), defaultCommandParser); + this.redisReplicator.addCommandParser(CommandName.name("XADD"), defaultCommandParser); + this.redisReplicator.addCommandParser(CommandName.name("XCLAIM"), defaultCommandParser); + this.redisReplicator.addCommandParser(CommandName.name("XDEL"), defaultCommandParser); + this.redisReplicator.addCommandParser(CommandName.name("XGROUP"), defaultCommandParser); + this.redisReplicator.addCommandParser(CommandName.name("XTRIM"), defaultCommandParser); + this.redisReplicator.addCommandParser(CommandName.name("XSETID"), defaultCommandParser); + // since redis 6.2 + this.redisReplicator.addCommandParser(CommandName.name("COPY"), defaultCommandParser); + this.redisReplicator.addCommandParser(CommandName.name("LMOVE"), defaultCommandParser); + this.redisReplicator.addCommandParser(CommandName.name("BLMOVE"), defaultCommandParser); + this.redisReplicator.addCommandParser(CommandName.name("ZDIFFSTORE"), defaultCommandParser); + this.redisReplicator.addCommandParser(CommandName.name("GEOSEARCHSTORE"), defaultCommandParser); + // since redis 7.0 + this.redisReplicator.addCommandParser(CommandName.name("SPUBLISH"), defaultCommandParser); + this.redisReplicator.addCommandParser(CommandName.name("FUNCTION"), defaultCommandParser); + // add EventListener + this.redisReplicator.addEventListener((replicator, event) -> { + if (event instanceof KeyValuePair<?, ?> || event instanceof DefaultCommand) { + KeyValuePair<?, ?> kvEvent = (KeyValuePair<?, ?>) event; + String key = kvEvent.getKey().toString(); + if (keys.contains(key)) { + synchronizeData(gson.toJson(event)); + } + } + if (event instanceof PostRdbSyncEvent) { + this.snapShot = String.valueOf(replicator.getConfiguration().getReplOffset()); + LOGGER.info("after rdb snapShot is: {}", snapShot); + } + }); + } + + /** + * init GSON parser + */ + private void initGson() { + this.gson = + new GsonBuilder().registerTypeAdapter(KeyStringValueHash.class, new TypeAdapter<KeyStringValueHash>() { + + @Override + public void write(JsonWriter out, KeyStringValueHash kv) throws IOException { + out.beginObject(); + out.name("DB").beginObject(); + out.name("dbNumber").value(kv.getDb().getDbNumber()); + out.name("dbSize").value(kv.getDb().getDbsize()); + out.name("expires").value(kv.getDb().getExpires()); + out.endObject(); + out.name("valueRdbType").value(kv.getValueRdbType()); + out.name("key").value(new String(kv.getKey())); + out.name("value").beginObject(); + for (byte[] b : kv.getValue().keySet()) { + out.name(new String(b)).value(new String(kv.getValue().get(b))); + } + out.endObject(); + out.endObject(); + } + + @Override + public KeyStringValueHash read(JsonReader in) throws IOException { + return null; + } + }).registerTypeAdapter(DefaultCommand.class, new TypeAdapter<DefaultCommand>() { + + @Override + public void write(JsonWriter out, DefaultCommand dc) throws IOException { + out.beginObject(); + out.name("key").value(new String(dc.getCommand())); + out.name("value").beginArray(); + for (byte[] bytes : dc.getArgs()) { + out.value(new String(bytes)); + } + out.endArray(); + out.endObject(); + } + + @Override + public DefaultCommand read(JsonReader in) throws IOException { + return null; + } + }) + .registerTypeAdapter(KeyStringValueList.class, new TypeAdapter<KeyStringValueList>() { + + @Override + public void write(JsonWriter out, KeyStringValueList kv) throws IOException { + out.beginObject(); + out.name("key").value(new String(kv.getKey())); + out.name("value").beginArray(); + for (byte[] bytes : kv.getValue()) { + out.value(new String(bytes)); + } + out.endArray(); + out.endObject(); + } + + @Override + public KeyStringValueList read(JsonReader in) throws IOException { + return null; + } + }) + .registerTypeAdapter(KeyStringValueSet.class, new TypeAdapter<KeyStringValueSet>() { + + @Override + public void write(JsonWriter out, KeyStringValueSet kv) throws IOException { + out.beginObject(); + out.name("key").value(new String(kv.getKey())); + out.name("value").beginArray(); + for (byte[] bytes : kv.getValue()) { + out.value(new String(bytes)); + } + out.endArray(); + out.endObject(); + } + + @Override + public KeyStringValueSet read(JsonReader in) throws IOException { + return null; + } + }) + .registerTypeAdapter(KeyStringValueString.class, new TypeAdapter<KeyStringValueString>() { + + @Override + public void write(JsonWriter out, KeyStringValueString kv) throws IOException { + out.beginObject(); + out.name("key").value(new String(kv.getKey())); + out.name("value").value(new String(kv.getValue())); + out.endObject(); + } + + @Override + public KeyStringValueString read(JsonReader in) throws IOException { + return null; + } + }) + .registerTypeAdapter(KeyStringValueZSet.class, new TypeAdapter<KeyStringValueZSet>() { + + @Override + public void write(JsonWriter out, KeyStringValueZSet kv) throws IOException { + out.beginObject(); + out.name("key").value(new String(kv.getKey())); + out.name("value").beginArray(); + for (ZSetEntry entry : kv.getValue()) { + out.beginObject(); + out.name("element").value(new String(entry.getElement())); + out.name("score").value(entry.getScore()); + out.endObject(); + } + out.endArray(); + out.endObject(); + } + + @Override + public KeyStringValueZSet read(JsonReader in) throws IOException { + return null; + } + }) + .create(); } } diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/RedisTask.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/RedisTask.java new file mode 100644 index 0000000000..b9f7449ecd --- /dev/null +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/RedisTask.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.plugin.task; + +import org.apache.inlong.agent.conf.InstanceProfile; +import org.apache.inlong.agent.conf.TaskProfile; +import org.apache.inlong.agent.constant.CycleUnitType; +import org.apache.inlong.agent.constant.TaskConstants; +import org.apache.inlong.agent.utils.AgentUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.List; + +public class RedisTask extends AbstractTask { + + private static final Logger LOGGER = LoggerFactory.getLogger(RedisTask.class); + public static final String DEFAULT_REDIS_INSTANCE = "org.apache.inlong.agent.plugin.instance.RedisInstance"; + private boolean isAdded = false; + private String taskId; + + private final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyyMMddHH"); + + @Override + public boolean isProfileValid(TaskProfile profile) { + if (!profile.allRequiredKeyExist()) { + LOGGER.error("task profile needs all required key"); + return false; + } + return true; + } + + @Override + protected int getInstanceLimit() { + return DEFAULT_INSTANCE_LIMIT; + } + + @Override + protected void initTask() { + LOGGER.info("Redis commonInit: {}", taskProfile.toJsonStr()); + this.taskId = taskProfile.get(TaskConstants.TASK_REDIS_REPLID) + + "-" + taskProfile.get(TaskConstants.TASK_REDIS_IS_SUBSCRIBE); + } + + @Override + protected List<InstanceProfile> getNewInstanceList() { + List<InstanceProfile> list = new ArrayList<>(); + if (isAdded) { + return list; + } + String dataTime = LocalDateTime.now().format(dateTimeFormatter); + InstanceProfile instanceProfile = taskProfile.createInstanceProfile(DEFAULT_REDIS_INSTANCE, taskId, + CycleUnitType.HOUR, dataTime, AgentUtils.getCurrentTime()); + LOGGER.info("taskProfile.createInstanceProfile: {}", instanceProfile.toJsonStr()); + list.add(instanceProfile); + this.isAdded = true; + return list; + } +} diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestRedisSource.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestRedisSource.java new file mode 100644 index 0000000000..061a74c092 --- /dev/null +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestRedisSource.java @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.plugin.sources; + +import org.apache.inlong.agent.conf.InstanceProfile; +import org.apache.inlong.agent.conf.TaskProfile; +import org.apache.inlong.agent.constant.AgentConstants; +import org.apache.inlong.agent.constant.CommonConstants; +import org.apache.inlong.agent.constant.TaskConstants; +import org.apache.inlong.agent.core.task.OffsetManager; +import org.apache.inlong.agent.core.task.TaskManager; +import org.apache.inlong.agent.plugin.AgentBaseTestsHelper; +import org.apache.inlong.agent.store.Store; +import org.apache.inlong.agent.utils.AgentUtils; +import org.apache.inlong.common.enums.TaskStateEnum; +import org.apache.inlong.common.metric.MetricRegister; + +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.powermock.core.classloader.annotations.PowerMockIgnore; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.Pipeline; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.powermock.api.mockito.PowerMockito.mockStatic; +import static org.powermock.api.mockito.PowerMockito.when; + +/** + * Test cases for {@link RedisSource}. + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest({Executors.class, RedisSource.class, MetricRegister.class}) +@PowerMockIgnore({"javax.management.*"}) +public class TestRedisSource { + + private static final Logger LOGGER = LoggerFactory.getLogger(TestRedisSource.class); + + private static AgentBaseTestsHelper helper; + + private final String instanceId = "s4bc475560b4444dbd4e9812ab1fd64d"; + + private static Store taskBasicStore; + private static Store instanceBasicStore; + private static Store offsetBasicStore; + + @Mock + private InstanceProfile profile; + + @Mock + private Jedis jedis; + + @Mock + private Pipeline pipeline; + + @Mock + private ScheduledExecutorService executor; + + @InjectMocks + private RedisSource redisSource; + + @Before + public void setUp() { + helper = new AgentBaseTestsHelper(UUID.randomUUID().toString()).setupAgentHome(); + taskBasicStore = TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_TASK); + instanceBasicStore = TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_INSTANCE); + offsetBasicStore = + TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_OFFSET); + OffsetManager.init(taskBasicStore, instanceBasicStore, offsetBasicStore); + mockStatic(Executors.class); + when(Executors.newSingleThreadExecutor()).thenReturn(executor); + when(Executors.newScheduledThreadPool(1)).thenReturn(executor); + initProfile(); + } + + private void initProfile() { + final String username = ""; + final String password = "123456"; + final String hostname = "127.0.0.1"; + final String port = "6379"; + final String groupId = "group01"; + final String streamId = "stream01"; + final String keys = "age,name,sex"; + final String command = "zscore"; + final String subOperation = "set,del"; + + TaskProfile taskProfile = helper.getTaskProfile(1, "", "csv", false, 0L, 0L, TaskStateEnum.RUNNING, "D", + "GMT+8:00", null); + profile = taskProfile.createInstanceProfile("", + "", taskProfile.getCycleUnit(), "20240725", AgentUtils.getCurrentTime()); + profile.set(CommonConstants.PROXY_INLONG_GROUP_ID, groupId); + profile.set(CommonConstants.PROXY_INLONG_STREAM_ID, streamId); + profile.set(TaskConstants.TASK_REDIS_AUTHUSER, username); + profile.set(TaskConstants.TASK_REDIS_AUTHPASSWORD, password); + profile.set(TaskConstants.TASK_REDIS_HOSTNAME, hostname); + profile.set(TaskConstants.TASK_REDIS_PORT, port); + profile.set(TaskConstants.TASK_REDIS_COMMAND, command); + profile.set(TaskConstants.TASK_REDIS_KEYS, keys); + profile.set(TaskConstants.TASK_AUDIT_VERSION, "0"); + profile.set(TaskConstants.TASK_REDIS_SUBOPERATION, subOperation); + profile.setInstanceId(instanceId); + } + + @Test + public void testJedisStartup() { + try { + profile.setBoolean(TaskConstants.TASK_REDIS_IS_SUBSCRIBE, false); + redisSource.initSource(profile); + redisSource.releaseSource(); + } catch (Exception e) { + } + } + + @Test + public void testReplicatorStartup() { + try { + profile.setBoolean(TaskConstants.TASK_REDIS_IS_SUBSCRIBE, true); + redisSource.initSource(profile); + redisSource.releaseSource(); + } catch (Exception e) { + LOGGER.error(e.getMessage()); + } + } + + @Test + public void testScheduledExecutorStartup() { + try { + profile.setBoolean(TaskConstants.TASK_REDIS_IS_SUBSCRIBE, false); + redisSource.initSource(profile); + verify(executor, times(1)).scheduleWithFixedDelay(any(Runnable.class), eq(0L), eq(60 * 1000L), + eq(TimeUnit.MILLISECONDS)); + redisSource.releaseSource(); + } catch (Exception e) { + } + } + + @Test + public void testFetchDataByJedis_Get() + throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { + when(jedis.pipelined()).thenReturn(pipeline); + when(pipeline.syncAndReturnAll()).thenReturn(Arrays.asList("value1", "value2", "value3")); + + List<String> keys = Arrays.asList("key1", "key2", "key3"); + + Map<String, Object> expectedData = new HashMap<>(); + expectedData.put("key1", "value1"); + expectedData.put("key2", "value2"); + expectedData.put("key3", "value3"); + Method method = RedisSource.class.getDeclaredMethod("fetchDataByJedis", Jedis.class, String.class, List.class, + String.class); + method.setAccessible(true); + Map<String, Object> result = (Map<String, Object>) method.invoke(redisSource, jedis, "GET", keys, null); + assertEquals(expectedData, result); + verify(jedis).pipelined(); + verify(pipeline, times(3)).get(anyString()); + verify(pipeline).syncAndReturnAll(); + executor.shutdown(); + } + + @Test + public void testFetchDataByJedis_Mget() + throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { + when(jedis.mget(eq("key1"), eq("key2"), eq("key3"))).thenReturn(Arrays.asList("value1", "value2", "value3")); + List<String> keys = Arrays.asList("key1", "key2", "key3"); + Map<String, Object> expectedData = new HashMap<>(); + expectedData.put("key1", "value1"); + expectedData.put("key2", "value2"); + expectedData.put("key3", "value3"); + Method method = RedisSource.class.getDeclaredMethod("fetchDataByJedis", Jedis.class, String.class, List.class, + String.class); + method.setAccessible(true); + Map<String, Object> result = (Map<String, Object>) method.invoke(redisSource, jedis, "MGET", keys, null); + assertEquals(expectedData, result); + verify(jedis).mget(eq("key1"), eq("key2"), eq("key3")); + executor.shutdown(); + } + + @Test + public void testFetchDataByJedis_Hget() + throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { + when(jedis.hget("key1", "field1")).thenReturn("hash_value1"); + when(jedis.hget("key2", "field1")).thenReturn("hash_value2"); + List<String> keys = Arrays.asList("key1", "key2"); + + Map<String, Object> expectedData = new HashMap<>(); + expectedData.put("key1", "hash_value1"); + expectedData.put("key2", "hash_value2"); + + Method method = RedisSource.class.getDeclaredMethod("fetchDataByJedis", Jedis.class, String.class, List.class, + String.class); + method.setAccessible(true); + Map<String, Object> result = (Map<String, Object>) method.invoke(redisSource, jedis, "HGET", keys, "field1"); + assertEquals(expectedData, result); + + verify(jedis, times(1)).hget("key1", "field1"); + verify(jedis, times(1)).hget("key2", "field1"); + executor.shutdown(); + } + + @Test + public void testFetchDataByJedis_Exists() + throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { + when(jedis.exists("key1")).thenReturn(true); + when(jedis.exists("key2")).thenReturn(false); + + List<String> keys = Arrays.asList("key1", "key2"); + + Map<String, Object> expectedData = new HashMap<>(); + expectedData.put("key1", true); + expectedData.put("key2", false); + + Method method = RedisSource.class.getDeclaredMethod("fetchDataByJedis", Jedis.class, String.class, List.class, + String.class); + method.setAccessible(true); + Map<String, Object> result = (Map<String, Object>) method.invoke(redisSource, jedis, "EXISTS", keys, null); + assertEquals(expectedData, result); + + verify(jedis, times(1)).exists("key1"); + verify(jedis, times(1)).exists("key2"); + executor.shutdown(); + } +}