This is an automated email from the ASF dual-hosted git repository.

aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new b18b51104b [INLONG-11764][Manager] Support SQL stream source (#11765)
b18b51104b is described below

commit b18b51104b6d31044bf53fc2e12b5150f385a6d0
Author: fuweng11 <76141879+fuwen...@users.noreply.github.com>
AuthorDate: Mon Feb 17 19:48:21 2025 +0800

    [INLONG-11764][Manager] Support SQL stream source (#11765)
---
 .../inlong/manager/common/consts/DataNodeType.java |  1 +
 .../inlong/manager/common/consts/SourceType.java   |  3 +-
 .../manager/pojo/node/sql/SqlDataNodeDTO.java      | 61 ++++++++++++++
 .../manager/pojo/node/sql/SqlDataNodeInfo.java     | 50 ++++++++++++
 .../manager/pojo/node/sql/SqlDataNodeRequest.java  | 39 +++++++++
 .../manager/pojo/source/cos/COSSourceDTO.java      |  2 -
 .../pojo/source/sql/SqlDataAddTaskRequest.java     | 36 +++++++++
 .../inlong/manager/pojo/source/sql/SqlSource.java  | 92 ++++++++++++++++++++++
 .../COSSourceDTO.java => sql/SqlSourceDTO.java}    | 70 +++++++---------
 .../manager/pojo/source/sql/SqlSourceRequest.java  | 74 +++++++++++++++++
 .../service/core/impl/AgentServiceImpl.java        | 16 ++--
 .../service/node/mysql/MySQLDataNodeOperator.java  |  2 +-
 .../SqlDataNodeOperator.java}                      | 50 +++---------
 .../service/source/AbstractSourceOperator.java     | 23 +-----
 .../service/source/StreamSourceOperator.java       |  6 ++
 .../service/source/cos/COSSourceOperator.java      | 15 ++++
 .../service/source/file/FileSourceOperator.java    | 17 ++++
 .../SqlSourceOperator.java}                        | 73 ++++++++++-------
 18 files changed, 488 insertions(+), 142 deletions(-)

diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
index 35164d31d6..e30df92573 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
@@ -30,6 +30,7 @@ public class DataNodeType {
     public static final String ELASTICSEARCH = "ELASTICSEARCH";
     public static final String MYSQL = "MYSQL";
     public static final String COS = "COS";
+    public static final String SQL = "SQL";
     public static final String STARROCKS = "STARROCKS";
     public static final String REDIS = "REDIS";
     public static final String KUDU = "KUDU";
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SourceType.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SourceType.java
index 59deca4749..263950297d 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SourceType.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SourceType.java
@@ -32,7 +32,7 @@ public class SourceType extends StreamType {
 
     public static final String FILE = "FILE";
     public static final String COS = "COS";
-    public static final String MYSQL_SQL = "MYSQL_SQL";
+    public static final String SQL = "SQL";
     public static final String MYSQL_BINLOG = "MYSQL_BINLOG";
     public static final String MONGODB = "MONGODB";
     public static final String REDIS = "REDIS";
@@ -49,7 +49,6 @@ public class SourceType extends StreamType {
 
             put(FILE, TaskTypeEnum.FILE);
             put(COS, TaskTypeEnum.COS);
-            put(MYSQL_SQL, TaskTypeEnum.SQL);
             put(MYSQL_BINLOG, TaskTypeEnum.BINLOG);
             put(POSTGRESQL, TaskTypeEnum.POSTGRES);
             put(ORACLE, TaskTypeEnum.ORACLE);
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/sql/SqlDataNodeDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/sql/SqlDataNodeDTO.java
new file mode 100644
index 0000000000..4bd2a3a2ba
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/sql/SqlDataNodeDTO.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.node.sql;
+
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonUtils;
+
+import io.swagger.annotations.ApiModel;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.commons.lang3.StringUtils;
+
+import javax.validation.constraints.NotNull;
+
+/**
+ * Sql data node info
+ */
+@Data
+@NoArgsConstructor
+@ApiModel("Sql data node info")
+public class SqlDataNodeDTO {
+
+    /**
+     * Get the dto instance from the request
+     */
+    public static SqlDataNodeDTO getFromRequest(SqlDataNodeRequest request, 
String extParams) {
+        SqlDataNodeDTO dto = StringUtils.isNotBlank(extParams)
+                ? SqlDataNodeDTO.getFromJson(extParams)
+                : new SqlDataNodeDTO();
+        return CommonBeanUtils.copyProperties(request, dto, true);
+    }
+
+    /**
+     * Get the dto instance from the JSON string.
+     */
+    public static SqlDataNodeDTO getFromJson(@NotNull String extParams) {
+        try {
+            return JsonUtils.parseObject(extParams, SqlDataNodeDTO.class);
+        } catch (Exception e) {
+            throw new BusinessException(ErrorCodeEnum.CLUSTER_INFO_INCORRECT,
+                    String.format("Failed to parse extParams for Sql node: 
%s", e.getMessage()));
+        }
+    }
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/sql/SqlDataNodeInfo.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/sql/SqlDataNodeInfo.java
new file mode 100644
index 0000000000..fc9e83f178
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/sql/SqlDataNodeInfo.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.node.sql;
+
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.node.DataNodeInfo;
+
+import io.swagger.annotations.ApiModel;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import lombok.experimental.SuperBuilder;
+
+/**
+ * Sql data node info
+ */
+@Data
+@SuperBuilder
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(value = DataNodeType.SQL)
+@ApiModel("Sql data node info")
+public class SqlDataNodeInfo extends DataNodeInfo {
+
+    public SqlDataNodeInfo() {
+        this.setType(DataNodeType.SQL);
+    }
+
+    @Override
+    public SqlDataNodeRequest genRequest() {
+        return CommonBeanUtils.copyProperties(this, SqlDataNodeRequest::new);
+    }
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/sql/SqlDataNodeRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/sql/SqlDataNodeRequest.java
new file mode 100644
index 0000000000..df75e3d653
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/sql/SqlDataNodeRequest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.node.sql;
+
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.node.DataNodeRequest;
+
+import io.swagger.annotations.ApiModel;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
+/**
+ * Sql data node request
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(value = DataNodeType.SQL)
+@ApiModel("Sql data node request")
+public class SqlDataNodeRequest extends DataNodeRequest {
+
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/cos/COSSourceDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/cos/COSSourceDTO.java
index 1dd85385fc..4d2bfed790 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/cos/COSSourceDTO.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/cos/COSSourceDTO.java
@@ -104,10 +104,8 @@ public class COSSourceDTO {
 
     public static COSSourceDTO getFromJson(@NotNull String extParams) {
         try {
-            log.info("teste extparmas={}", extParams);
             return JsonUtils.parseObject(extParams, COSSourceDTO.class);
         } catch (Exception e) {
-            log.info("teste extparmas=eoor:", e);
             throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT,
                     String.format("parse extParams of COSSource failure: %s", 
e.getMessage()));
         }
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/sql/SqlDataAddTaskRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/sql/SqlDataAddTaskRequest.java
new file mode 100644
index 0000000000..50a87b03df
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/sql/SqlDataAddTaskRequest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.source.sql;
+
+import org.apache.inlong.manager.common.consts.SourceType;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.source.DataAddTaskRequest;
+
+import io.swagger.annotations.ApiModel;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(value = SourceType.SQL)
+@ApiModel(value = "Sql data add task request")
+public class SqlDataAddTaskRequest extends DataAddTaskRequest {
+
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/sql/SqlSource.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/sql/SqlSource.java
new file mode 100644
index 0000000000..7bfa03dc60
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/sql/SqlSource.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.source.sql;
+
+import org.apache.inlong.manager.common.consts.SourceType;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.source.SourceRequest;
+import org.apache.inlong.manager.pojo.source.StreamSource;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import lombok.experimental.SuperBuilder;
+
+/**
+ * Sql source info
+ */
+@Data
+@SuperBuilder
+@AllArgsConstructor
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@ApiModel(value = "Sql source info")
+@JsonTypeDefine(value = SourceType.SQL)
+public class SqlSource extends StreamSource {
+
+    @ApiModelProperty(value = "sql", required = true)
+    private String sql;
+
+    @ApiModelProperty("Cycle unit")
+    private String cycleUnit;
+
+    @ApiModelProperty("Whether retry")
+    private Boolean retry;
+
+    @ApiModelProperty(value = "Data start time")
+    private String dataTimeFrom;
+
+    @ApiModelProperty(value = "Data end time")
+    private String dataTimeTo;
+
+    @ApiModelProperty("TimeOffset for collection, "
+            + "'1m' means from one minute after, '-1m' means from one minute 
before, "
+            + "'1h' means from one hour after, '-1h' means from one minute 
before, "
+            + "'1d' means from one day after, '-1d' means from one minute 
before, "
+            + "Null or blank means from current timestamp")
+    private String timeOffset;
+
+    @ApiModelProperty("Max instance count")
+    private Integer maxInstanceCount;
+
+    @ApiModelProperty("Jdbc url")
+    private String jdbcUrl;
+
+    @ApiModelProperty("Username for JDBC URL")
+    private String username;
+
+    @ApiModelProperty("jdbc password")
+    private String jdbcPassword;
+
+    @ApiModelProperty("Fetch size")
+    private Integer fetchSize;
+
+    public SqlSource() {
+        this.setSourceType(SourceType.SQL);
+    }
+
+    @Override
+    public SourceRequest genSourceRequest() {
+        return CommonBeanUtils.copyProperties(this, SqlSourceRequest::new);
+    }
+
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/cos/COSSourceDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/sql/SqlSourceDTO.java
similarity index 58%
copy from 
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/cos/COSSourceDTO.java
copy to 
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/sql/SqlSourceDTO.java
index 1dd85385fc..bdcdb773cb 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/cos/COSSourceDTO.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/sql/SqlSourceDTO.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.pojo.source.cos;
+package org.apache.inlong.manager.pojo.source.sql;
 
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
@@ -32,29 +32,24 @@ import org.apache.commons.lang3.StringUtils;
 
 import javax.validation.constraints.NotNull;
 
-import java.util.List;
-
 /**
- * COS source information data transfer object
+ * Sql source information data transfer object
  */
 @Builder
 @AllArgsConstructor
 @NoArgsConstructor
 @Data
 @Slf4j
-public class COSSourceDTO {
+public class SqlSourceDTO {
 
-    @ApiModelProperty(value = "Path regex pattern for file, such as 
/a/b/*.txt", required = true)
-    private String pattern;
+    @ApiModelProperty(value = "sql", required = true)
+    private String sql;
 
     @ApiModelProperty("Cycle unit")
-    private String cycleUnit = "D";
+    private String cycleUnit;
 
     @ApiModelProperty("Whether retry")
-    private Boolean retry = false;;
-
-    @ApiModelProperty("Column separator of data source ")
-    private String dataSeparator;
+    private Boolean retry = false;
 
     @ApiModelProperty(value = "Data start time")
     private String dataTimeFrom;
@@ -69,47 +64,40 @@ public class COSSourceDTO {
             + "Null or blank means from current timestamp")
     private String timeOffset;
 
-    @ApiModelProperty("Max file count")
-    private String maxFileCount;
+    @ApiModelProperty("Max instance count")
+    private Integer maxInstanceCount;
 
-    @ApiModelProperty(" Type of data result for column separator"
-            + "         CSV format, set this parameter to a custom separator: 
, | : "
-            + "         Json format, set this parameter to json ")
-    private String contentStyle;
+    @ApiModelProperty("Jdbc url")
+    private String jdbcUrl;
 
-    @ApiModelProperty(value = "Audit version")
-    private String auditVersion;
+    @ApiModelProperty("Username for JDBC URL")
+    private String username;
 
-    @ApiModelProperty("filterStreams")
-    private List<String> filterStreams;
+    @ApiModelProperty("jdbc password")
+    private String jdbcPassword;
 
-    @ApiModelProperty(value = "COS bucket name")
-    private String bucketName;
+    @ApiModelProperty("Fetch size")
+    private Integer fetchSize;
 
-    @ApiModelProperty(value = "COS secret id")
-    private String credentialsId;
-
-    @ApiModelProperty(value = "COS secret key")
-    private String credentialsKey;
+    @ApiModelProperty("Column separator of data source ")
+    private String dataSeparator;
 
-    @ApiModelProperty(value = "COS region")
-    private String region;
+    @ApiModelProperty(value = "Audit version")
+    private String auditVersion;
 
-    public static COSSourceDTO getFromRequest(@NotNull COSSourceRequest 
cosSourceRequest, String extParams) {
-        COSSourceDTO dto = StringUtils.isNotBlank(extParams)
-                ? COSSourceDTO.getFromJson(extParams)
-                : new COSSourceDTO();
-        return CommonBeanUtils.copyProperties(cosSourceRequest, dto, true);
+    public static SqlSourceDTO getFromRequest(@NotNull SqlSourceRequest 
sqlSourceRequest, String extParams) {
+        SqlSourceDTO dto = StringUtils.isNotBlank(extParams)
+                ? SqlSourceDTO.getFromJson(extParams)
+                : new SqlSourceDTO();
+        return CommonBeanUtils.copyProperties(sqlSourceRequest, dto, true);
     }
 
-    public static COSSourceDTO getFromJson(@NotNull String extParams) {
+    public static SqlSourceDTO getFromJson(@NotNull String extParams) {
         try {
-            log.info("teste extparmas={}", extParams);
-            return JsonUtils.parseObject(extParams, COSSourceDTO.class);
+            return JsonUtils.parseObject(extParams, SqlSourceDTO.class);
         } catch (Exception e) {
-            log.info("teste extparmas=eoor:", e);
             throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT,
-                    String.format("parse extParams of COSSource failure: %s", 
e.getMessage()));
+                    String.format("parse extParams of SqlSource failure: %s", 
e.getMessage()));
         }
     }
 
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/sql/SqlSourceRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/sql/SqlSourceRequest.java
new file mode 100644
index 0000000000..6d76f88edd
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/sql/SqlSourceRequest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.source.sql;
+
+import org.apache.inlong.manager.common.consts.SourceType;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.source.SourceRequest;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(value = SourceType.SQL)
+@ApiModel(value = "Sql source request")
+public class SqlSourceRequest extends SourceRequest {
+
+    @ApiModelProperty(value = "sql", required = true)
+    private String sql;
+
+    @ApiModelProperty("Cycle unit")
+    private String cycleUnit;
+
+    @ApiModelProperty("Whether retry")
+    private Boolean retry;
+
+    @ApiModelProperty(value = "Data start time")
+    private String dataTimeFrom;
+
+    @ApiModelProperty(value = "Data end time")
+    private String dataTimeTo;
+
+    @ApiModelProperty("TimeOffset for collection, "
+            + "'1m' means from one minute after, '-1m' means from one minute 
before, "
+            + "'1h' means from one hour after, '-1h' means from one minute 
before, "
+            + "'1d' means from one day after, '-1d' means from one minute 
before, "
+            + "Null or blank means from current timestamp")
+    private String timeOffset;
+
+    @ApiModelProperty("Max instance count")
+    private Integer maxInstanceCount;
+
+    @ApiModelProperty("Jdbc url")
+    private String jdbcUrl;
+
+    @ApiModelProperty("Username for JDBC URL")
+    private String username;
+
+    @ApiModelProperty("jdbc password")
+    private String jdbcPassword;
+
+    @ApiModelProperty("Fetch size")
+    private Integer fetchSize;
+
+}
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
index 5f7958a3e1..5f4c775b10 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
@@ -401,6 +401,13 @@ public class AgentServiceImpl implements AgentService {
                     if (moduleConfig == null) {
                         continue;
                     }
+                    if (configResult != null && 
CollectionUtils.isNotEmpty(configResult.getModuleList())) {
+                        for (ModuleConfig config : 
configResult.getModuleList()) {
+                            if (Objects.equals(config.getEntityId(), 
moduleId)) {
+                                restartTime = config.getRestartTime();
+                            }
+                        }
+                    }
                     moduleConfig.setRestartTime(restartTime);
                     String moduleStr = GSON.toJson(moduleConfig);
                     String moduleMd5 = DigestUtils.md5Hex(moduleStr);
@@ -591,13 +598,13 @@ public class AgentServiceImpl implements AgentService {
 
     @Override
     public ConfigResult getConfig(ConfigRequest request) {
-        if (!updateModuleConfigQueue.contains(request)) {
-            updateModuleConfigQueue.add(request);
-        }
         String key = request.getLocalIp() + InlongConstants.UNDERSCORE + 
request.getClusterName();
 
         ConfigResult configResult = installerConfigMap.get(key);
         if (configResult == null) {
+            if (!updateModuleConfigQueue.contains(request)) {
+                updateModuleConfigQueue.add(request);
+            }
             LOGGER.debug(String.format("can not get config result for cluster 
name=%s, ip=%s", request.getClusterName(),
                     request.getLocalIp()));
             return null;
@@ -653,8 +660,7 @@ public class AgentServiceImpl implements AgentService {
             needAddStatusList = 
Arrays.asList(SourceStatus.TO_BE_ISSUED_ADD.getCode(),
                     SourceStatus.TO_BE_ISSUED_ACTIVE.getCode());
         }
-        List<String> sourceTypes = Lists.newArrayList(SourceType.MYSQL_SQL, 
SourceType.KAFKA,
-                SourceType.MYSQL_BINLOG, SourceType.POSTGRESQL);
+        List<String> sourceTypes = Lists.newArrayList(SourceType.KAFKA, 
SourceType.MYSQL_BINLOG, SourceType.POSTGRESQL);
         List<StreamSourceEntity> sourceEntities = 
sourceMapper.selectByStatusAndType(needAddStatusList, sourceTypes,
                 TASK_FETCH_SIZE);
         for (StreamSourceEntity sourceEntity : sourceEntities) {
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/mysql/MySQLDataNodeOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/mysql/MySQLDataNodeOperator.java
index e9b3f002fd..13428de606 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/mysql/MySQLDataNodeOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/mysql/MySQLDataNodeOperator.java
@@ -118,7 +118,7 @@ public class MySQLDataNodeOperator extends 
AbstractDataNodeOperator {
                 || !Objects.equals(nodeRequest.getUsername(), 
nodeInfo.getUsername())
                 || !Objects.equals(nodeRequest.getToken(), 
nodeInfo.getToken());
         if (changed) {
-            retryStreamSourceByDataNodeNameAndType(dataNodeEntity.getName(), 
SourceType.MYSQL_SQL, operator);
+            retryStreamSourceByDataNodeNameAndType(dataNodeEntity.getName(), 
SourceType.MYSQL_BINLOG, operator);
         }
     }
 
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/mysql/MySQLDataNodeOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/sql/SqlDataNodeOperator.java
similarity index 60%
copy from 
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/mysql/MySQLDataNodeOperator.java
copy to 
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/sql/SqlDataNodeOperator.java
index e9b3f002fd..7ca7d344b8 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/mysql/MySQLDataNodeOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/sql/SqlDataNodeOperator.java
@@ -15,10 +15,9 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.service.node.mysql;
+package org.apache.inlong.manager.service.node.sql;
 
 import org.apache.inlong.manager.common.consts.DataNodeType;
-import org.apache.inlong.manager.common.consts.SourceType;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
@@ -27,31 +26,24 @@ import org.apache.inlong.manager.dao.entity.DataNodeEntity;
 import org.apache.inlong.manager.pojo.node.DataNodeInfo;
 import org.apache.inlong.manager.pojo.node.DataNodeRequest;
 import org.apache.inlong.manager.pojo.node.mysql.MySQLDataNodeDTO;
-import org.apache.inlong.manager.pojo.node.mysql.MySQLDataNodeInfo;
-import org.apache.inlong.manager.pojo.node.mysql.MySQLDataNodeRequest;
+import org.apache.inlong.manager.pojo.node.sql.SqlDataNodeInfo;
+import org.apache.inlong.manager.pojo.node.sql.SqlDataNodeRequest;
 import org.apache.inlong.manager.service.node.AbstractDataNodeOperator;
 import org.apache.inlong.manager.service.resource.sink.mysql.MySQLJdbcUtils;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
 import java.sql.Connection;
-import java.util.Objects;
 
 /**
- * MySQL data node operator
+ * Sql data node operator
  */
 @Service
-public class MySQLDataNodeOperator extends AbstractDataNodeOperator {
+public class SqlDataNodeOperator extends AbstractDataNodeOperator {
 
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(MySQLDataNodeOperator.class);
-
-    @Autowired
-    private ObjectMapper objectMapper;
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(SqlDataNodeOperator.class);
 
     @Override
     public Boolean accept(String dataNodeType) {
@@ -60,7 +52,7 @@ public class MySQLDataNodeOperator extends 
AbstractDataNodeOperator {
 
     @Override
     public String getDataNodeType() {
-        return DataNodeType.MYSQL;
+        return DataNodeType.SQL;
     }
 
     @Override
@@ -69,26 +61,15 @@ public class MySQLDataNodeOperator extends 
AbstractDataNodeOperator {
             throw new BusinessException(ErrorCodeEnum.DATA_NODE_NOT_FOUND);
         }
 
-        MySQLDataNodeInfo dataNodeInfo = new MySQLDataNodeInfo();
+        SqlDataNodeInfo dataNodeInfo = new SqlDataNodeInfo();
         CommonBeanUtils.copyProperties(entity, dataNodeInfo);
-        if (StringUtils.isNotBlank(entity.getExtParams())) {
-            MySQLDataNodeDTO dto = 
MySQLDataNodeDTO.getFromJson(entity.getExtParams());
-            CommonBeanUtils.copyProperties(dto, dataNodeInfo);
-        }
         return dataNodeInfo;
     }
 
     @Override
     protected void setTargetEntity(DataNodeRequest request, DataNodeEntity 
targetEntity) {
-        MySQLDataNodeRequest dataNodeRequest = (MySQLDataNodeRequest) request;
+        SqlDataNodeRequest dataNodeRequest = (SqlDataNodeRequest) request;
         CommonBeanUtils.copyProperties(dataNodeRequest, targetEntity, true);
-        try {
-            MySQLDataNodeDTO dto = 
MySQLDataNodeDTO.getFromRequest(dataNodeRequest, targetEntity.getExtParams());
-            targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
-        } catch (Exception e) {
-            throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT,
-                    String.format("Failed to build extParams for MySQL node: 
%s", e.getMessage()));
-        }
     }
 
     @Override
@@ -109,17 +90,4 @@ public class MySQLDataNodeOperator extends 
AbstractDataNodeOperator {
         }
     }
 
-    @Override
-    public void updateRelatedStreamSource(DataNodeRequest request, 
DataNodeEntity dataNodeEntity, String operator) {
-        MySQLDataNodeRequest nodeRequest = (MySQLDataNodeRequest) request;
-        MySQLDataNodeInfo nodeInfo = (MySQLDataNodeInfo) 
this.getFromEntity(dataNodeEntity);
-        boolean changed = !Objects.equals(nodeRequest.getUrl(), 
nodeInfo.getUrl())
-                || !Objects.equals(nodeRequest.getBackupUrl(), 
nodeInfo.getBackupUrl())
-                || !Objects.equals(nodeRequest.getUsername(), 
nodeInfo.getUsername())
-                || !Objects.equals(nodeRequest.getToken(), 
nodeInfo.getToken());
-        if (changed) {
-            retryStreamSourceByDataNodeNameAndType(dataNodeEntity.getName(), 
SourceType.MYSQL_SQL, operator);
-        }
-    }
-
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
index 83382dbcc1..c65c18b968 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
@@ -59,8 +59,6 @@ import 
org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarDTO;
 import org.apache.inlong.manager.pojo.source.DataAddTaskRequest;
 import org.apache.inlong.manager.pojo.source.SourceRequest;
 import org.apache.inlong.manager.pojo.source.StreamSource;
-import org.apache.inlong.manager.pojo.source.cos.COSSourceDTO;
-import org.apache.inlong.manager.pojo.source.file.FileSourceDTO;
 import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.pojo.stream.StreamField;
 import org.apache.inlong.manager.service.node.DataNodeService;
@@ -545,26 +543,7 @@ public abstract class AbstractSourceOperator implements 
StreamSourceOperator {
                             ? TaskStateEnum.RUNNING.getType()
                             : TaskStateEnum.FROZEN.getType());
             dataConfig.setSyncSend(streamEntity.getSyncSend());
-            if (SourceType.FILE.equalsIgnoreCase(entity.getSourceType())) {
-                String dataSeparator = String.valueOf((char) 
Integer.parseInt(streamEntity.getDataSeparator()));
-                FileSourceDTO fileSourceDTO = JsonUtils.parseObject(extParams, 
FileSourceDTO.class);
-                if (Objects.nonNull(fileSourceDTO)) {
-                    fileSourceDTO.setDataSeparator(dataSeparator);
-                    
dataConfig.setAuditVersion(fileSourceDTO.getAuditVersion());
-                    
fileSourceDTO.setDataContentStyle(streamEntity.getDataType());
-                    extParams = JsonUtils.toJsonString(fileSourceDTO);
-                }
-            }
-            if (SourceType.COS.equalsIgnoreCase(entity.getSourceType())) {
-                String dataSeparator = String.valueOf((char) 
Integer.parseInt(streamEntity.getDataSeparator()));
-                COSSourceDTO cosSourceDTO = JsonUtils.parseObject(extParams, 
COSSourceDTO.class);
-                if (Objects.nonNull(cosSourceDTO)) {
-                    cosSourceDTO.setDataSeparator(dataSeparator);
-                    dataConfig.setAuditVersion(cosSourceDTO.getAuditVersion());
-                    cosSourceDTO.setContentStyle(streamEntity.getDataType());
-                    extParams = JsonUtils.toJsonString(cosSourceDTO);
-                }
-            }
+            extParams = sourceOperator.updateDataConfig(extParams, 
streamEntity, dataConfig);
             InlongStreamInfo streamInfo = 
CommonBeanUtils.copyProperties(streamEntity, InlongStreamInfo::new);
             // Processing extParams
             unpackExtParams(streamEntity.getExtParams(), streamInfo);
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperator.java
index 07b0879330..19e7f61190 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperator.java
@@ -17,6 +17,8 @@
 
 package org.apache.inlong.manager.service.source;
 
+import org.apache.inlong.common.pojo.agent.DataConfig;
+import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
 import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
 import org.apache.inlong.manager.pojo.common.PageResult;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
@@ -144,4 +146,8 @@ public interface StreamSourceOperator {
      */
     void updateAgentTaskConfig(SourceRequest request, String operator);
 
+    default String updateDataConfig(String extParams, InlongStreamEntity 
streamEntity, DataConfig dataConfig) {
+        return extParams;
+    }
+
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/cos/COSSourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/cos/COSSourceOperator.java
index f99a6321bd..c533d7df7f 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/cos/COSSourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/cos/COSSourceOperator.java
@@ -17,12 +17,14 @@
 
 package org.apache.inlong.manager.service.source.cos;
 
+import org.apache.inlong.common.pojo.agent.DataConfig;
 import org.apache.inlong.manager.common.consts.DataNodeType;
 import org.apache.inlong.manager.common.consts.SourceType;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
 import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
 import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
 import org.apache.inlong.manager.pojo.node.cos.COSDataNodeInfo;
@@ -157,4 +159,17 @@ public class COSSourceOperator extends 
AbstractSourceOperator {
         }
     }
 
+    @Override
+    public String updateDataConfig(String extParams, InlongStreamEntity 
streamEntity, DataConfig dataConfig) {
+        String dataSeparator = String.valueOf((char) 
Integer.parseInt(streamEntity.getDataSeparator()));
+        COSSourceDTO cosSourceDTO = JsonUtils.parseObject(extParams, 
COSSourceDTO.class);
+        if (Objects.nonNull(cosSourceDTO)) {
+            cosSourceDTO.setDataSeparator(dataSeparator);
+            dataConfig.setAuditVersion(cosSourceDTO.getAuditVersion());
+            cosSourceDTO.setContentStyle(streamEntity.getDataType());
+            extParams = JsonUtils.toJsonString(cosSourceDTO);
+        }
+        return extParams;
+    }
+
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperator.java
index faf797dc7a..90a0212aa1 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperator.java
@@ -17,10 +17,13 @@
 
 package org.apache.inlong.manager.service.source.file;
 
+import org.apache.inlong.common.pojo.agent.DataConfig;
 import org.apache.inlong.manager.common.consts.SourceType;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
 import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
 import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
 import org.apache.inlong.manager.pojo.source.DataAddTaskDTO;
@@ -43,6 +46,7 @@ import org.springframework.transaction.annotation.Isolation;
 import org.springframework.transaction.annotation.Transactional;
 
 import java.util.List;
+import java.util.Objects;
 import java.util.stream.Collectors;
 
 /**
@@ -140,4 +144,17 @@ public class FileSourceOperator extends 
AbstractSourceOperator {
         }
     }
 
+    @Override
+    public String updateDataConfig(String extParams, InlongStreamEntity 
streamEntity, DataConfig dataConfig) {
+        String dataSeparator = String.valueOf((char) 
Integer.parseInt(streamEntity.getDataSeparator()));
+        FileSourceDTO fileSourceDTO = JsonUtils.parseObject(extParams, 
FileSourceDTO.class);
+        if (Objects.nonNull(fileSourceDTO)) {
+            fileSourceDTO.setDataSeparator(dataSeparator);
+            dataConfig.setAuditVersion(fileSourceDTO.getAuditVersion());
+            fileSourceDTO.setDataContentStyle(streamEntity.getDataType());
+            extParams = JsonUtils.toJsonString(fileSourceDTO);
+        }
+        return extParams;
+    }
+
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/cos/COSSourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/sql/SqlSourceOperator.java
similarity index 69%
copy from 
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/cos/COSSourceOperator.java
copy to 
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/sql/SqlSourceOperator.java
index f99a6321bd..20425fad84 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/cos/COSSourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/sql/SqlSourceOperator.java
@@ -15,25 +15,28 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.service.source.cos;
+package org.apache.inlong.manager.service.source.sql;
 
+import org.apache.inlong.common.pojo.agent.DataConfig;
 import org.apache.inlong.manager.common.consts.DataNodeType;
 import org.apache.inlong.manager.common.consts.SourceType;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
 import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
+import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
 import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
-import org.apache.inlong.manager.pojo.node.cos.COSDataNodeInfo;
+import org.apache.inlong.manager.pojo.node.sql.SqlDataNodeInfo;
 import org.apache.inlong.manager.pojo.source.DataAddTaskDTO;
 import org.apache.inlong.manager.pojo.source.DataAddTaskRequest;
 import org.apache.inlong.manager.pojo.source.SourceRequest;
 import org.apache.inlong.manager.pojo.source.StreamSource;
-import org.apache.inlong.manager.pojo.source.cos.COSDataAddTaskRequest;
-import org.apache.inlong.manager.pojo.source.cos.COSSource;
-import org.apache.inlong.manager.pojo.source.cos.COSSourceDTO;
-import org.apache.inlong.manager.pojo.source.cos.COSSourceRequest;
+import org.apache.inlong.manager.pojo.source.sql.SqlDataAddTaskRequest;
+import org.apache.inlong.manager.pojo.source.sql.SqlSource;
+import org.apache.inlong.manager.pojo.source.sql.SqlSourceDTO;
+import org.apache.inlong.manager.pojo.source.sql.SqlSourceRequest;
 import org.apache.inlong.manager.pojo.stream.StreamField;
 import org.apache.inlong.manager.service.source.AbstractSourceOperator;
 
@@ -51,50 +54,52 @@ import java.util.Objects;
 import java.util.stream.Collectors;
 
 /**
- * COS source operator, such as get or set COS source info.
+ * Sql source operator, such as get or set Sql source info.
  */
 @Service
-public class COSSourceOperator extends AbstractSourceOperator {
+public class SqlSourceOperator extends AbstractSourceOperator {
 
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(COSSourceOperator.class);
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(SqlSourceOperator.class);
 
     @Autowired
     private ObjectMapper objectMapper;
 
     @Autowired
     private StreamSourceEntityMapper sourceMapper;
+    @Autowired
+    private InlongStreamEntityMapper streamEntityMapper;
 
     @Override
     public Boolean accept(String sourceType) {
-        return SourceType.COS.equals(sourceType);
+        return SourceType.SQL.equals(sourceType);
     }
 
     @Override
     protected String getSourceType() {
-        return SourceType.COS;
+        return SourceType.SQL;
     }
 
     @Override
     protected void setTargetEntity(SourceRequest request, StreamSourceEntity 
targetEntity) {
-        COSSourceRequest sourceRequest = (COSSourceRequest) request;
+        SqlSourceRequest sourceRequest = (SqlSourceRequest) request;
         try {
             CommonBeanUtils.copyProperties(sourceRequest, targetEntity, true);
-            COSSourceDTO dto = COSSourceDTO.getFromRequest(sourceRequest, 
targetEntity.getExtParams());
+            SqlSourceDTO dto = SqlSourceDTO.getFromRequest(sourceRequest, 
targetEntity.getExtParams());
             targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
         } catch (Exception e) {
             throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT,
-                    String.format("serialize extParams of COS SourceDTO 
failure: %s", e.getMessage()));
+                    String.format("serialize extParams of Sql SourceDTO 
failure: %s", e.getMessage()));
         }
     }
 
     @Override
     public StreamSource getFromEntity(StreamSourceEntity entity) {
-        COSSource source = new COSSource();
+        SqlSource source = new SqlSource();
         if (entity == null) {
             return source;
         }
 
-        COSSourceDTO dto = COSSourceDTO.getFromJson(entity.getExtParams());
+        SqlSourceDTO dto = SqlSourceDTO.getFromJson(entity.getExtParams());
         CommonBeanUtils.copyProperties(entity, source, true);
         CommonBeanUtils.copyProperties(dto, source, true);
 
@@ -113,13 +118,15 @@ public class COSSourceOperator extends 
AbstractSourceOperator {
 
     @Override
     public String getExtParams(StreamSourceEntity sourceEntity) {
-        COSSourceDTO cosSourceDTO = 
COSSourceDTO.getFromJson(sourceEntity.getExtParams());
-        if (Objects.nonNull(cosSourceDTO) && 
StringUtils.isNotBlank(sourceEntity.getDataNodeName())) {
-            COSDataNodeInfo dataNodeInfo =
-                    (COSDataNodeInfo) 
dataNodeService.getByKeyWithoutTenant(sourceEntity.getDataNodeName(),
-                            DataNodeType.COS);
-            CommonBeanUtils.copyProperties(dataNodeInfo, cosSourceDTO, true);
-            return JsonUtils.toJsonString(cosSourceDTO);
+        SqlSourceDTO sqlSourceDTO = 
SqlSourceDTO.getFromJson(sourceEntity.getExtParams());
+        if (Objects.nonNull(sqlSourceDTO) && 
StringUtils.isNotBlank(sourceEntity.getDataNodeName())) {
+            SqlDataNodeInfo dataNodeInfo =
+                    (SqlDataNodeInfo) 
dataNodeService.getByKeyWithoutTenant(sourceEntity.getDataNodeName(),
+                            DataNodeType.SQL);
+            sqlSourceDTO.setJdbcUrl(dataNodeInfo.getUrl());
+            sqlSourceDTO.setUsername(dataNodeInfo.getUsername());
+            sqlSourceDTO.setJdbcPassword(dataNodeInfo.getToken());
+            return JsonUtils.toJsonString(sqlSourceDTO);
         }
         return sourceEntity.getExtParams();
     }
@@ -128,16 +135,15 @@ public class COSSourceOperator extends 
AbstractSourceOperator {
     @Transactional(rollbackFor = Throwable.class, isolation = 
Isolation.REPEATABLE_READ)
     public Integer addDataAddTask(DataAddTaskRequest request, String operator) 
{
         try {
-            COSDataAddTaskRequest sourceRequest = (COSDataAddTaskRequest) 
request;
+            SqlDataAddTaskRequest sourceRequest = (SqlDataAddTaskRequest) 
request;
             StreamSourceEntity sourceEntity = 
sourceMapper.selectById(request.getSourceId());
-            COSSourceDTO dto = 
COSSourceDTO.getFromJson(sourceEntity.getExtParams());
+            SqlSourceDTO dto = 
SqlSourceDTO.getFromJson(sourceEntity.getExtParams());
             dto.setDataTimeFrom(sourceRequest.getDataTimeFrom());
             dto.setDataTimeTo(sourceRequest.getDataTimeTo());
             dto.setRetry(true);
             if (request.getIncreaseAuditVersion()) {
                 dto.setAuditVersion(request.getAuditVersion());
             }
-            dto.setFilterStreams(sourceRequest.getFilterStreams());
             StreamSourceEntity dataAddTaskEntity =
                     CommonBeanUtils.copyProperties(sourceEntity, 
StreamSourceEntity::new);
             dataAddTaskEntity.setId(null);
@@ -151,10 +157,21 @@ public class COSSourceOperator extends 
AbstractSourceOperator {
             updateAgentTaskConfig(dataAddTaskRequest, operator);
             return id;
         } catch (Exception e) {
-            LOGGER.error("serialize extParams of COS SourceDTO failure: ", e);
+            LOGGER.error("serialize extParams of Sql SourceDTO failure: ", e);
             throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT,
-                    String.format("serialize extParams of COS SourceDTO 
failure: %s", e.getMessage()));
+                    String.format("serialize extParams of Sql SourceDTO 
failure: %s", e.getMessage()));
         }
     }
 
+    @Override
+    public String updateDataConfig(String extParams, InlongStreamEntity 
streamEntity, DataConfig dataConfig) {
+        String dataSeparator = String.valueOf((char) 
Integer.parseInt(streamEntity.getDataSeparator()));
+        SqlSourceDTO sqlSourceDTO = JsonUtils.parseObject(extParams, 
SqlSourceDTO.class);
+        if (Objects.nonNull(sqlSourceDTO)) {
+            sqlSourceDTO.setDataSeparator(dataSeparator);
+            dataConfig.setAuditVersion(sqlSourceDTO.getAuditVersion());
+            extParams = JsonUtils.toJsonString(sqlSourceDTO);
+        }
+        return extParams;
+    }
 }

Reply via email to