This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 84f738c91 [INLONG-7404][Agent] Fix error of Redis connector (#7405) 84f738c91 is described below commit 84f738c9132250e99496e4badcc44215089169f3 Author: haifxu <xhf1208357...@gmail.com> AuthorDate: Wed Feb 22 16:41:54 2023 +0800 [INLONG-7404][Agent] Fix error of Redis connector (#7405) --- .../apache/inlong/agent/pojo/JobProfileDto.java | 27 +++++++++++++ .../org/apache/inlong/agent/pojo/RedisJob.java | 46 ++++++++++++++++++++++ .../inlong/agent/plugin/sources/RedisSource.java | 3 +- .../agent/plugin/sources/reader/RedisReader.java | 4 ++ .../apache/inlong/common/enums/TaskTypeEnum.java | 2 + 5 files changed, 81 insertions(+), 1 deletion(-) diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java index 74e8377af..169392f02 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java @@ -64,6 +64,10 @@ public class JobProfileDto { * oracle source */ public static final String ORACLE_SOURCE = "org.apache.inlong.agent.plugin.sources.OracleSource"; + /** + * redis source + */ + public static final String REDIS_SOURCE = "org.apache.inlong.agent.plugin.sources.RedisSource"; /** * mqtt source */ @@ -215,6 +219,22 @@ public class JobProfileDto { return postgreSQLJob; } + private static RedisJob getRedisJob(DataConfig dataConfig) { + RedisJob.RedisJobConfig config = GSON.fromJson(dataConfig.getExtParams(), RedisJob.RedisJobConfig.class); + RedisJob redisJob = new RedisJob(); + + redisJob.setAuthUser(config.getUsername()); + redisJob.setAuthPassword(config.getPassword()); + redisJob.setHostname(config.getHostname()); + redisJob.setPort(config.getPort()); + redisJob.setSsl(config.getSsl()); + redisJob.setReadTimeout(config.getTimeout()); + redisJob.setQueueSize(config.getQueueSize()); + redisJob.setReplId(config.getReplId()); + + return redisJob; + } + private static MongoJob getMongoJob(DataConfig dataConfigs) { MongoJob.MongoJobTaskConfig config = GSON.fromJson(dataConfigs.getExtParams(), @@ -440,6 +460,12 @@ public class JobProfileDto { job.setSource(MONGO_SOURCE); profileDto.setJob(job); break; + case REDIS: + RedisJob redisJob = getRedisJob(dataConfig); + job.setRedisJob(redisJob); + job.setSource(REDIS_SOURCE); + profileDto.setJob(job); + break; case MQTT: MqttJob mqttJob = getMqttJob(dataConfig); job.setMqttJob(mqttJob); @@ -481,6 +507,7 @@ public class JobProfileDto { private PostgreSQLJob postgreSQLJob; private OracleJob oracleJob; private MongoJob mongoJob; + private RedisJob redisJob; private MqttJob mqttJob; private SqlServerJob sqlserverJob; } diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/RedisJob.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/RedisJob.java new file mode 100644 index 000000000..5917f9efe --- /dev/null +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/RedisJob.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.pojo; + +import lombok.Data; + +@Data +public class RedisJob { + + private String authUser; + private String authPassword; + private String hostname; + private String port; + private Boolean ssl; + private String readTimeout; + private String queueSize; + private String replId; + + @Data + public static class RedisJobConfig { + + private String username; + private String password; + private String hostname; + private String port; + private Boolean ssl; + private String timeout; + private String queueSize; + private String replId; + } +} diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/RedisSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/RedisSource.java index 48003c8de..04d51b933 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/RedisSource.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/RedisSource.java @@ -40,7 +40,8 @@ public class RedisSource extends AbstractSource { @Override public List<Reader> split(JobProfile conf) { super.init(conf); - Reader redisReader = new RedisReader(); + RedisReader redisReader = new RedisReader(); + redisReader.setReadSource(conf.getInstanceId()); List<Reader> readerList = new ArrayList<>(); readerList.add(redisReader); sourceMetric.sourceSuccessCount.incrementAndGet(); diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/RedisReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/RedisReader.java index 394ffc209..62203c459 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/RedisReader.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/RedisReader.java @@ -202,6 +202,10 @@ public class RedisReader extends AbstractReader { return instanceId; } + public void setReadSource(String instanceId) { + this.instanceId = instanceId; + } + @Override public void setReadTimeout(long mill) { diff --git a/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java b/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java index 97dc71f6f..180fb0a88 100644 --- a/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java +++ b/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java @@ -72,6 +72,8 @@ public enum TaskTypeEnum { return MONGODB; case 10: return TUBEMQ; + case 11: + return REDIS; case 12: return MQTT; case 13: