This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 84f738c91 [INLONG-7404][Agent] Fix error of Redis connector (#7405)
84f738c91 is described below

commit 84f738c9132250e99496e4badcc44215089169f3
Author: haifxu <xhf1208357...@gmail.com>
AuthorDate: Wed Feb 22 16:41:54 2023 +0800

    [INLONG-7404][Agent] Fix error of Redis connector (#7405)
---
 .../apache/inlong/agent/pojo/JobProfileDto.java    | 27 +++++++++++++
 .../org/apache/inlong/agent/pojo/RedisJob.java     | 46 ++++++++++++++++++++++
 .../inlong/agent/plugin/sources/RedisSource.java   |  3 +-
 .../agent/plugin/sources/reader/RedisReader.java   |  4 ++
 .../apache/inlong/common/enums/TaskTypeEnum.java   |  2 +
 5 files changed, 81 insertions(+), 1 deletion(-)

diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
index 74e8377af..169392f02 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
@@ -64,6 +64,10 @@ public class JobProfileDto {
      * oracle source
      */
     public static final String ORACLE_SOURCE = 
"org.apache.inlong.agent.plugin.sources.OracleSource";
+    /**
+     * redis source
+     */
+    public static final String REDIS_SOURCE = 
"org.apache.inlong.agent.plugin.sources.RedisSource";
     /**
      * mqtt source
      */
@@ -215,6 +219,22 @@ public class JobProfileDto {
         return postgreSQLJob;
     }
 
+    private static RedisJob getRedisJob(DataConfig dataConfig) {
+        RedisJob.RedisJobConfig config = 
GSON.fromJson(dataConfig.getExtParams(), RedisJob.RedisJobConfig.class);
+        RedisJob redisJob = new RedisJob();
+
+        redisJob.setAuthUser(config.getUsername());
+        redisJob.setAuthPassword(config.getPassword());
+        redisJob.setHostname(config.getHostname());
+        redisJob.setPort(config.getPort());
+        redisJob.setSsl(config.getSsl());
+        redisJob.setReadTimeout(config.getTimeout());
+        redisJob.setQueueSize(config.getQueueSize());
+        redisJob.setReplId(config.getReplId());
+
+        return redisJob;
+    }
+
     private static MongoJob getMongoJob(DataConfig dataConfigs) {
 
         MongoJob.MongoJobTaskConfig config = 
GSON.fromJson(dataConfigs.getExtParams(),
@@ -440,6 +460,12 @@ public class JobProfileDto {
                 job.setSource(MONGO_SOURCE);
                 profileDto.setJob(job);
                 break;
+            case REDIS:
+                RedisJob redisJob = getRedisJob(dataConfig);
+                job.setRedisJob(redisJob);
+                job.setSource(REDIS_SOURCE);
+                profileDto.setJob(job);
+                break;
             case MQTT:
                 MqttJob mqttJob = getMqttJob(dataConfig);
                 job.setMqttJob(mqttJob);
@@ -481,6 +507,7 @@ public class JobProfileDto {
         private PostgreSQLJob postgreSQLJob;
         private OracleJob oracleJob;
         private MongoJob mongoJob;
+        private RedisJob redisJob;
         private MqttJob mqttJob;
         private SqlServerJob sqlserverJob;
     }
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/RedisJob.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/RedisJob.java
new file mode 100644
index 000000000..5917f9efe
--- /dev/null
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/RedisJob.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.pojo;
+
+import lombok.Data;
+
+@Data
+public class RedisJob {
+
+    private String authUser;
+    private String authPassword;
+    private String hostname;
+    private String port;
+    private Boolean ssl;
+    private String readTimeout;
+    private String queueSize;
+    private String replId;
+
+    @Data
+    public static class RedisJobConfig {
+
+        private String username;
+        private String password;
+        private String hostname;
+        private String port;
+        private Boolean ssl;
+        private String timeout;
+        private String queueSize;
+        private String replId;
+    }
+}
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/RedisSource.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/RedisSource.java
index 48003c8de..04d51b933 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/RedisSource.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/RedisSource.java
@@ -40,7 +40,8 @@ public class RedisSource extends AbstractSource {
     @Override
     public List<Reader> split(JobProfile conf) {
         super.init(conf);
-        Reader redisReader = new RedisReader();
+        RedisReader redisReader = new RedisReader();
+        redisReader.setReadSource(conf.getInstanceId());
         List<Reader> readerList = new ArrayList<>();
         readerList.add(redisReader);
         sourceMetric.sourceSuccessCount.incrementAndGet();
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/RedisReader.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/RedisReader.java
index 394ffc209..62203c459 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/RedisReader.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/RedisReader.java
@@ -202,6 +202,10 @@ public class RedisReader extends AbstractReader {
         return instanceId;
     }
 
+    public void setReadSource(String instanceId) {
+        this.instanceId = instanceId;
+    }
+
     @Override
     public void setReadTimeout(long mill) {
 
diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java 
b/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java
index 97dc71f6f..180fb0a88 100644
--- 
a/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java
+++ 
b/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java
@@ -72,6 +72,8 @@ public enum TaskTypeEnum {
                 return MONGODB;
             case 10:
                 return TUBEMQ;
+            case 11:
+                return REDIS;
             case 12:
                 return MQTT;
             case 13:

Reply via email to