This is an automated email from the ASF dual-hosted git repository. aloyszhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new b18b51104b [INLONG-11764][Manager] Support SQL stream source (#11765) b18b51104b is described below commit b18b51104b6d31044bf53fc2e12b5150f385a6d0 Author: fuweng11 <76141879+fuwen...@users.noreply.github.com> AuthorDate: Mon Feb 17 19:48:21 2025 +0800 [INLONG-11764][Manager] Support SQL stream source (#11765) --- .../inlong/manager/common/consts/DataNodeType.java | 1 + .../inlong/manager/common/consts/SourceType.java | 3 +- .../manager/pojo/node/sql/SqlDataNodeDTO.java | 61 ++++++++++++++ .../manager/pojo/node/sql/SqlDataNodeInfo.java | 50 ++++++++++++ .../manager/pojo/node/sql/SqlDataNodeRequest.java | 39 +++++++++ .../manager/pojo/source/cos/COSSourceDTO.java | 2 - .../pojo/source/sql/SqlDataAddTaskRequest.java | 36 +++++++++ .../inlong/manager/pojo/source/sql/SqlSource.java | 92 ++++++++++++++++++++++ .../COSSourceDTO.java => sql/SqlSourceDTO.java} | 70 +++++++--------- .../manager/pojo/source/sql/SqlSourceRequest.java | 74 +++++++++++++++++ .../service/core/impl/AgentServiceImpl.java | 16 ++-- .../service/node/mysql/MySQLDataNodeOperator.java | 2 +- .../SqlDataNodeOperator.java} | 50 +++--------- .../service/source/AbstractSourceOperator.java | 23 +----- .../service/source/StreamSourceOperator.java | 6 ++ .../service/source/cos/COSSourceOperator.java | 15 ++++ .../service/source/file/FileSourceOperator.java | 17 ++++ .../SqlSourceOperator.java} | 73 ++++++++++------- 18 files changed, 488 insertions(+), 142 deletions(-) diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java index 35164d31d6..e30df92573 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java @@ -30,6 +30,7 @@ public class DataNodeType { public static final String ELASTICSEARCH = "ELASTICSEARCH"; public static final String MYSQL = "MYSQL"; public static final String COS = "COS"; + public static final String SQL = "SQL"; public static final String STARROCKS = "STARROCKS"; public static final String REDIS = "REDIS"; public static final String KUDU = "KUDU"; diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SourceType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SourceType.java index 59deca4749..263950297d 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SourceType.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SourceType.java @@ -32,7 +32,7 @@ public class SourceType extends StreamType { public static final String FILE = "FILE"; public static final String COS = "COS"; - public static final String MYSQL_SQL = "MYSQL_SQL"; + public static final String SQL = "SQL"; public static final String MYSQL_BINLOG = "MYSQL_BINLOG"; public static final String MONGODB = "MONGODB"; public static final String REDIS = "REDIS"; @@ -49,7 +49,6 @@ public class SourceType extends StreamType { put(FILE, TaskTypeEnum.FILE); put(COS, TaskTypeEnum.COS); - put(MYSQL_SQL, TaskTypeEnum.SQL); put(MYSQL_BINLOG, TaskTypeEnum.BINLOG); put(POSTGRESQL, TaskTypeEnum.POSTGRES); put(ORACLE, TaskTypeEnum.ORACLE); diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/sql/SqlDataNodeDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/sql/SqlDataNodeDTO.java new file mode 100644 index 0000000000..4bd2a3a2ba --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/sql/SqlDataNodeDTO.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.node.sql; + +import org.apache.inlong.manager.common.enums.ErrorCodeEnum; +import org.apache.inlong.manager.common.exceptions.BusinessException; +import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.common.util.JsonUtils; + +import io.swagger.annotations.ApiModel; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.apache.commons.lang3.StringUtils; + +import javax.validation.constraints.NotNull; + +/** + * Sql data node info + */ +@Data +@NoArgsConstructor +@ApiModel("Sql data node info") +public class SqlDataNodeDTO { + + /** + * Get the dto instance from the request + */ + public static SqlDataNodeDTO getFromRequest(SqlDataNodeRequest request, String extParams) { + SqlDataNodeDTO dto = StringUtils.isNotBlank(extParams) + ? SqlDataNodeDTO.getFromJson(extParams) + : new SqlDataNodeDTO(); + return CommonBeanUtils.copyProperties(request, dto, true); + } + + /** + * Get the dto instance from the JSON string. + */ + public static SqlDataNodeDTO getFromJson(@NotNull String extParams) { + try { + return JsonUtils.parseObject(extParams, SqlDataNodeDTO.class); + } catch (Exception e) { + throw new BusinessException(ErrorCodeEnum.CLUSTER_INFO_INCORRECT, + String.format("Failed to parse extParams for Sql node: %s", e.getMessage())); + } + } +} diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/sql/SqlDataNodeInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/sql/SqlDataNodeInfo.java new file mode 100644 index 0000000000..fc9e83f178 --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/sql/SqlDataNodeInfo.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.node.sql; + +import org.apache.inlong.manager.common.consts.DataNodeType; +import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.common.util.JsonTypeDefine; +import org.apache.inlong.manager.pojo.node.DataNodeInfo; + +import io.swagger.annotations.ApiModel; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.ToString; +import lombok.experimental.SuperBuilder; + +/** + * Sql data node info + */ +@Data +@SuperBuilder +@ToString(callSuper = true) +@EqualsAndHashCode(callSuper = true) +@JsonTypeDefine(value = DataNodeType.SQL) +@ApiModel("Sql data node info") +public class SqlDataNodeInfo extends DataNodeInfo { + + public SqlDataNodeInfo() { + this.setType(DataNodeType.SQL); + } + + @Override + public SqlDataNodeRequest genRequest() { + return CommonBeanUtils.copyProperties(this, SqlDataNodeRequest::new); + } +} diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/sql/SqlDataNodeRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/sql/SqlDataNodeRequest.java new file mode 100644 index 0000000000..df75e3d653 --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/sql/SqlDataNodeRequest.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.node.sql; + +import org.apache.inlong.manager.common.consts.DataNodeType; +import org.apache.inlong.manager.common.util.JsonTypeDefine; +import org.apache.inlong.manager.pojo.node.DataNodeRequest; + +import io.swagger.annotations.ApiModel; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.ToString; + +/** + * Sql data node request + */ +@Data +@ToString(callSuper = true) +@EqualsAndHashCode(callSuper = true) +@JsonTypeDefine(value = DataNodeType.SQL) +@ApiModel("Sql data node request") +public class SqlDataNodeRequest extends DataNodeRequest { + +} diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/cos/COSSourceDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/cos/COSSourceDTO.java index 1dd85385fc..4d2bfed790 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/cos/COSSourceDTO.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/cos/COSSourceDTO.java @@ -104,10 +104,8 @@ public class COSSourceDTO { public static COSSourceDTO getFromJson(@NotNull String extParams) { try { - log.info("teste extparmas={}", extParams); return JsonUtils.parseObject(extParams, COSSourceDTO.class); } catch (Exception e) { - log.info("teste extparmas=eoor:", e); throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT, String.format("parse extParams of COSSource failure: %s", e.getMessage())); } diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/sql/SqlDataAddTaskRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/sql/SqlDataAddTaskRequest.java new file mode 100644 index 0000000000..50a87b03df --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/sql/SqlDataAddTaskRequest.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.source.sql; + +import org.apache.inlong.manager.common.consts.SourceType; +import org.apache.inlong.manager.common.util.JsonTypeDefine; +import org.apache.inlong.manager.pojo.source.DataAddTaskRequest; + +import io.swagger.annotations.ApiModel; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.ToString; + +@Data +@ToString(callSuper = true) +@EqualsAndHashCode(callSuper = true) +@JsonTypeDefine(value = SourceType.SQL) +@ApiModel(value = "Sql data add task request") +public class SqlDataAddTaskRequest extends DataAddTaskRequest { + +} diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/sql/SqlSource.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/sql/SqlSource.java new file mode 100644 index 0000000000..7bfa03dc60 --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/sql/SqlSource.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.source.sql; + +import org.apache.inlong.manager.common.consts.SourceType; +import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.common.util.JsonTypeDefine; +import org.apache.inlong.manager.pojo.source.SourceRequest; +import org.apache.inlong.manager.pojo.source.StreamSource; + +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.ToString; +import lombok.experimental.SuperBuilder; + +/** + * Sql source info + */ +@Data +@SuperBuilder +@AllArgsConstructor +@ToString(callSuper = true) +@EqualsAndHashCode(callSuper = true) +@ApiModel(value = "Sql source info") +@JsonTypeDefine(value = SourceType.SQL) +public class SqlSource extends StreamSource { + + @ApiModelProperty(value = "sql", required = true) + private String sql; + + @ApiModelProperty("Cycle unit") + private String cycleUnit; + + @ApiModelProperty("Whether retry") + private Boolean retry; + + @ApiModelProperty(value = "Data start time") + private String dataTimeFrom; + + @ApiModelProperty(value = "Data end time") + private String dataTimeTo; + + @ApiModelProperty("TimeOffset for collection, " + + "'1m' means from one minute after, '-1m' means from one minute before, " + + "'1h' means from one hour after, '-1h' means from one minute before, " + + "'1d' means from one day after, '-1d' means from one minute before, " + + "Null or blank means from current timestamp") + private String timeOffset; + + @ApiModelProperty("Max instance count") + private Integer maxInstanceCount; + + @ApiModelProperty("Jdbc url") + private String jdbcUrl; + + @ApiModelProperty("Username for JDBC URL") + private String username; + + @ApiModelProperty("jdbc password") + private String jdbcPassword; + + @ApiModelProperty("Fetch size") + private Integer fetchSize; + + public SqlSource() { + this.setSourceType(SourceType.SQL); + } + + @Override + public SourceRequest genSourceRequest() { + return CommonBeanUtils.copyProperties(this, SqlSourceRequest::new); + } + +} diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/cos/COSSourceDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/sql/SqlSourceDTO.java similarity index 58% copy from inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/cos/COSSourceDTO.java copy to inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/sql/SqlSourceDTO.java index 1dd85385fc..bdcdb773cb 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/cos/COSSourceDTO.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/sql/SqlSourceDTO.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.inlong.manager.pojo.source.cos; +package org.apache.inlong.manager.pojo.source.sql; import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.exceptions.BusinessException; @@ -32,29 +32,24 @@ import org.apache.commons.lang3.StringUtils; import javax.validation.constraints.NotNull; -import java.util.List; - /** - * COS source information data transfer object + * Sql source information data transfer object */ @Builder @AllArgsConstructor @NoArgsConstructor @Data @Slf4j -public class COSSourceDTO { +public class SqlSourceDTO { - @ApiModelProperty(value = "Path regex pattern for file, such as /a/b/*.txt", required = true) - private String pattern; + @ApiModelProperty(value = "sql", required = true) + private String sql; @ApiModelProperty("Cycle unit") - private String cycleUnit = "D"; + private String cycleUnit; @ApiModelProperty("Whether retry") - private Boolean retry = false;; - - @ApiModelProperty("Column separator of data source ") - private String dataSeparator; + private Boolean retry = false; @ApiModelProperty(value = "Data start time") private String dataTimeFrom; @@ -69,47 +64,40 @@ public class COSSourceDTO { + "Null or blank means from current timestamp") private String timeOffset; - @ApiModelProperty("Max file count") - private String maxFileCount; + @ApiModelProperty("Max instance count") + private Integer maxInstanceCount; - @ApiModelProperty(" Type of data result for column separator" - + " CSV format, set this parameter to a custom separator: , | : " - + " Json format, set this parameter to json ") - private String contentStyle; + @ApiModelProperty("Jdbc url") + private String jdbcUrl; - @ApiModelProperty(value = "Audit version") - private String auditVersion; + @ApiModelProperty("Username for JDBC URL") + private String username; - @ApiModelProperty("filterStreams") - private List<String> filterStreams; + @ApiModelProperty("jdbc password") + private String jdbcPassword; - @ApiModelProperty(value = "COS bucket name") - private String bucketName; + @ApiModelProperty("Fetch size") + private Integer fetchSize; - @ApiModelProperty(value = "COS secret id") - private String credentialsId; - - @ApiModelProperty(value = "COS secret key") - private String credentialsKey; + @ApiModelProperty("Column separator of data source ") + private String dataSeparator; - @ApiModelProperty(value = "COS region") - private String region; + @ApiModelProperty(value = "Audit version") + private String auditVersion; - public static COSSourceDTO getFromRequest(@NotNull COSSourceRequest cosSourceRequest, String extParams) { - COSSourceDTO dto = StringUtils.isNotBlank(extParams) - ? COSSourceDTO.getFromJson(extParams) - : new COSSourceDTO(); - return CommonBeanUtils.copyProperties(cosSourceRequest, dto, true); + public static SqlSourceDTO getFromRequest(@NotNull SqlSourceRequest sqlSourceRequest, String extParams) { + SqlSourceDTO dto = StringUtils.isNotBlank(extParams) + ? SqlSourceDTO.getFromJson(extParams) + : new SqlSourceDTO(); + return CommonBeanUtils.copyProperties(sqlSourceRequest, dto, true); } - public static COSSourceDTO getFromJson(@NotNull String extParams) { + public static SqlSourceDTO getFromJson(@NotNull String extParams) { try { - log.info("teste extparmas={}", extParams); - return JsonUtils.parseObject(extParams, COSSourceDTO.class); + return JsonUtils.parseObject(extParams, SqlSourceDTO.class); } catch (Exception e) { - log.info("teste extparmas=eoor:", e); throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT, - String.format("parse extParams of COSSource failure: %s", e.getMessage())); + String.format("parse extParams of SqlSource failure: %s", e.getMessage())); } } diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/sql/SqlSourceRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/sql/SqlSourceRequest.java new file mode 100644 index 0000000000..6d76f88edd --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/sql/SqlSourceRequest.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.source.sql; + +import org.apache.inlong.manager.common.consts.SourceType; +import org.apache.inlong.manager.common.util.JsonTypeDefine; +import org.apache.inlong.manager.pojo.source.SourceRequest; + +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.ToString; + +@Data +@ToString(callSuper = true) +@EqualsAndHashCode(callSuper = true) +@JsonTypeDefine(value = SourceType.SQL) +@ApiModel(value = "Sql source request") +public class SqlSourceRequest extends SourceRequest { + + @ApiModelProperty(value = "sql", required = true) + private String sql; + + @ApiModelProperty("Cycle unit") + private String cycleUnit; + + @ApiModelProperty("Whether retry") + private Boolean retry; + + @ApiModelProperty(value = "Data start time") + private String dataTimeFrom; + + @ApiModelProperty(value = "Data end time") + private String dataTimeTo; + + @ApiModelProperty("TimeOffset for collection, " + + "'1m' means from one minute after, '-1m' means from one minute before, " + + "'1h' means from one hour after, '-1h' means from one minute before, " + + "'1d' means from one day after, '-1d' means from one minute before, " + + "Null or blank means from current timestamp") + private String timeOffset; + + @ApiModelProperty("Max instance count") + private Integer maxInstanceCount; + + @ApiModelProperty("Jdbc url") + private String jdbcUrl; + + @ApiModelProperty("Username for JDBC URL") + private String username; + + @ApiModelProperty("jdbc password") + private String jdbcPassword; + + @ApiModelProperty("Fetch size") + private Integer fetchSize; + +} diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java index 5f7958a3e1..5f4c775b10 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java @@ -401,6 +401,13 @@ public class AgentServiceImpl implements AgentService { if (moduleConfig == null) { continue; } + if (configResult != null && CollectionUtils.isNotEmpty(configResult.getModuleList())) { + for (ModuleConfig config : configResult.getModuleList()) { + if (Objects.equals(config.getEntityId(), moduleId)) { + restartTime = config.getRestartTime(); + } + } + } moduleConfig.setRestartTime(restartTime); String moduleStr = GSON.toJson(moduleConfig); String moduleMd5 = DigestUtils.md5Hex(moduleStr); @@ -591,13 +598,13 @@ public class AgentServiceImpl implements AgentService { @Override public ConfigResult getConfig(ConfigRequest request) { - if (!updateModuleConfigQueue.contains(request)) { - updateModuleConfigQueue.add(request); - } String key = request.getLocalIp() + InlongConstants.UNDERSCORE + request.getClusterName(); ConfigResult configResult = installerConfigMap.get(key); if (configResult == null) { + if (!updateModuleConfigQueue.contains(request)) { + updateModuleConfigQueue.add(request); + } LOGGER.debug(String.format("can not get config result for cluster name=%s, ip=%s", request.getClusterName(), request.getLocalIp())); return null; @@ -653,8 +660,7 @@ public class AgentServiceImpl implements AgentService { needAddStatusList = Arrays.asList(SourceStatus.TO_BE_ISSUED_ADD.getCode(), SourceStatus.TO_BE_ISSUED_ACTIVE.getCode()); } - List<String> sourceTypes = Lists.newArrayList(SourceType.MYSQL_SQL, SourceType.KAFKA, - SourceType.MYSQL_BINLOG, SourceType.POSTGRESQL); + List<String> sourceTypes = Lists.newArrayList(SourceType.KAFKA, SourceType.MYSQL_BINLOG, SourceType.POSTGRESQL); List<StreamSourceEntity> sourceEntities = sourceMapper.selectByStatusAndType(needAddStatusList, sourceTypes, TASK_FETCH_SIZE); for (StreamSourceEntity sourceEntity : sourceEntities) { diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/mysql/MySQLDataNodeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/mysql/MySQLDataNodeOperator.java index e9b3f002fd..13428de606 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/mysql/MySQLDataNodeOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/mysql/MySQLDataNodeOperator.java @@ -118,7 +118,7 @@ public class MySQLDataNodeOperator extends AbstractDataNodeOperator { || !Objects.equals(nodeRequest.getUsername(), nodeInfo.getUsername()) || !Objects.equals(nodeRequest.getToken(), nodeInfo.getToken()); if (changed) { - retryStreamSourceByDataNodeNameAndType(dataNodeEntity.getName(), SourceType.MYSQL_SQL, operator); + retryStreamSourceByDataNodeNameAndType(dataNodeEntity.getName(), SourceType.MYSQL_BINLOG, operator); } } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/mysql/MySQLDataNodeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/sql/SqlDataNodeOperator.java similarity index 60% copy from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/mysql/MySQLDataNodeOperator.java copy to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/sql/SqlDataNodeOperator.java index e9b3f002fd..7ca7d344b8 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/mysql/MySQLDataNodeOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/sql/SqlDataNodeOperator.java @@ -15,10 +15,9 @@ * limitations under the License. */ -package org.apache.inlong.manager.service.node.mysql; +package org.apache.inlong.manager.service.node.sql; import org.apache.inlong.manager.common.consts.DataNodeType; -import org.apache.inlong.manager.common.consts.SourceType; import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.exceptions.BusinessException; import org.apache.inlong.manager.common.util.CommonBeanUtils; @@ -27,31 +26,24 @@ import org.apache.inlong.manager.dao.entity.DataNodeEntity; import org.apache.inlong.manager.pojo.node.DataNodeInfo; import org.apache.inlong.manager.pojo.node.DataNodeRequest; import org.apache.inlong.manager.pojo.node.mysql.MySQLDataNodeDTO; -import org.apache.inlong.manager.pojo.node.mysql.MySQLDataNodeInfo; -import org.apache.inlong.manager.pojo.node.mysql.MySQLDataNodeRequest; +import org.apache.inlong.manager.pojo.node.sql.SqlDataNodeInfo; +import org.apache.inlong.manager.pojo.node.sql.SqlDataNodeRequest; import org.apache.inlong.manager.service.node.AbstractDataNodeOperator; import org.apache.inlong.manager.service.resource.sink.mysql.MySQLJdbcUtils; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.sql.Connection; -import java.util.Objects; /** - * MySQL data node operator + * Sql data node operator */ @Service -public class MySQLDataNodeOperator extends AbstractDataNodeOperator { +public class SqlDataNodeOperator extends AbstractDataNodeOperator { - private static final Logger LOGGER = LoggerFactory.getLogger(MySQLDataNodeOperator.class); - - @Autowired - private ObjectMapper objectMapper; + private static final Logger LOGGER = LoggerFactory.getLogger(SqlDataNodeOperator.class); @Override public Boolean accept(String dataNodeType) { @@ -60,7 +52,7 @@ public class MySQLDataNodeOperator extends AbstractDataNodeOperator { @Override public String getDataNodeType() { - return DataNodeType.MYSQL; + return DataNodeType.SQL; } @Override @@ -69,26 +61,15 @@ public class MySQLDataNodeOperator extends AbstractDataNodeOperator { throw new BusinessException(ErrorCodeEnum.DATA_NODE_NOT_FOUND); } - MySQLDataNodeInfo dataNodeInfo = new MySQLDataNodeInfo(); + SqlDataNodeInfo dataNodeInfo = new SqlDataNodeInfo(); CommonBeanUtils.copyProperties(entity, dataNodeInfo); - if (StringUtils.isNotBlank(entity.getExtParams())) { - MySQLDataNodeDTO dto = MySQLDataNodeDTO.getFromJson(entity.getExtParams()); - CommonBeanUtils.copyProperties(dto, dataNodeInfo); - } return dataNodeInfo; } @Override protected void setTargetEntity(DataNodeRequest request, DataNodeEntity targetEntity) { - MySQLDataNodeRequest dataNodeRequest = (MySQLDataNodeRequest) request; + SqlDataNodeRequest dataNodeRequest = (SqlDataNodeRequest) request; CommonBeanUtils.copyProperties(dataNodeRequest, targetEntity, true); - try { - MySQLDataNodeDTO dto = MySQLDataNodeDTO.getFromRequest(dataNodeRequest, targetEntity.getExtParams()); - targetEntity.setExtParams(objectMapper.writeValueAsString(dto)); - } catch (Exception e) { - throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT, - String.format("Failed to build extParams for MySQL node: %s", e.getMessage())); - } } @Override @@ -109,17 +90,4 @@ public class MySQLDataNodeOperator extends AbstractDataNodeOperator { } } - @Override - public void updateRelatedStreamSource(DataNodeRequest request, DataNodeEntity dataNodeEntity, String operator) { - MySQLDataNodeRequest nodeRequest = (MySQLDataNodeRequest) request; - MySQLDataNodeInfo nodeInfo = (MySQLDataNodeInfo) this.getFromEntity(dataNodeEntity); - boolean changed = !Objects.equals(nodeRequest.getUrl(), nodeInfo.getUrl()) - || !Objects.equals(nodeRequest.getBackupUrl(), nodeInfo.getBackupUrl()) - || !Objects.equals(nodeRequest.getUsername(), nodeInfo.getUsername()) - || !Objects.equals(nodeRequest.getToken(), nodeInfo.getToken()); - if (changed) { - retryStreamSourceByDataNodeNameAndType(dataNodeEntity.getName(), SourceType.MYSQL_SQL, operator); - } - } - } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java index 83382dbcc1..c65c18b968 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java @@ -59,8 +59,6 @@ import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarDTO; import org.apache.inlong.manager.pojo.source.DataAddTaskRequest; import org.apache.inlong.manager.pojo.source.SourceRequest; import org.apache.inlong.manager.pojo.source.StreamSource; -import org.apache.inlong.manager.pojo.source.cos.COSSourceDTO; -import org.apache.inlong.manager.pojo.source.file.FileSourceDTO; import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; import org.apache.inlong.manager.pojo.stream.StreamField; import org.apache.inlong.manager.service.node.DataNodeService; @@ -545,26 +543,7 @@ public abstract class AbstractSourceOperator implements StreamSourceOperator { ? TaskStateEnum.RUNNING.getType() : TaskStateEnum.FROZEN.getType()); dataConfig.setSyncSend(streamEntity.getSyncSend()); - if (SourceType.FILE.equalsIgnoreCase(entity.getSourceType())) { - String dataSeparator = String.valueOf((char) Integer.parseInt(streamEntity.getDataSeparator())); - FileSourceDTO fileSourceDTO = JsonUtils.parseObject(extParams, FileSourceDTO.class); - if (Objects.nonNull(fileSourceDTO)) { - fileSourceDTO.setDataSeparator(dataSeparator); - dataConfig.setAuditVersion(fileSourceDTO.getAuditVersion()); - fileSourceDTO.setDataContentStyle(streamEntity.getDataType()); - extParams = JsonUtils.toJsonString(fileSourceDTO); - } - } - if (SourceType.COS.equalsIgnoreCase(entity.getSourceType())) { - String dataSeparator = String.valueOf((char) Integer.parseInt(streamEntity.getDataSeparator())); - COSSourceDTO cosSourceDTO = JsonUtils.parseObject(extParams, COSSourceDTO.class); - if (Objects.nonNull(cosSourceDTO)) { - cosSourceDTO.setDataSeparator(dataSeparator); - dataConfig.setAuditVersion(cosSourceDTO.getAuditVersion()); - cosSourceDTO.setContentStyle(streamEntity.getDataType()); - extParams = JsonUtils.toJsonString(cosSourceDTO); - } - } + extParams = sourceOperator.updateDataConfig(extParams, streamEntity, dataConfig); InlongStreamInfo streamInfo = CommonBeanUtils.copyProperties(streamEntity, InlongStreamInfo::new); // Processing extParams unpackExtParams(streamEntity.getExtParams(), streamInfo); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperator.java index 07b0879330..19e7f61190 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperator.java @@ -17,6 +17,8 @@ package org.apache.inlong.manager.service.source; +import org.apache.inlong.common.pojo.agent.DataConfig; +import org.apache.inlong.manager.dao.entity.InlongStreamEntity; import org.apache.inlong.manager.dao.entity.StreamSourceEntity; import org.apache.inlong.manager.pojo.common.PageResult; import org.apache.inlong.manager.pojo.group.InlongGroupInfo; @@ -144,4 +146,8 @@ public interface StreamSourceOperator { */ void updateAgentTaskConfig(SourceRequest request, String operator); + default String updateDataConfig(String extParams, InlongStreamEntity streamEntity, DataConfig dataConfig) { + return extParams; + } + } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/cos/COSSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/cos/COSSourceOperator.java index f99a6321bd..c533d7df7f 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/cos/COSSourceOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/cos/COSSourceOperator.java @@ -17,12 +17,14 @@ package org.apache.inlong.manager.service.source.cos; +import org.apache.inlong.common.pojo.agent.DataConfig; import org.apache.inlong.manager.common.consts.DataNodeType; import org.apache.inlong.manager.common.consts.SourceType; import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.exceptions.BusinessException; import org.apache.inlong.manager.common.util.CommonBeanUtils; import org.apache.inlong.manager.common.util.JsonUtils; +import org.apache.inlong.manager.dao.entity.InlongStreamEntity; import org.apache.inlong.manager.dao.entity.StreamSourceEntity; import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper; import org.apache.inlong.manager.pojo.node.cos.COSDataNodeInfo; @@ -157,4 +159,17 @@ public class COSSourceOperator extends AbstractSourceOperator { } } + @Override + public String updateDataConfig(String extParams, InlongStreamEntity streamEntity, DataConfig dataConfig) { + String dataSeparator = String.valueOf((char) Integer.parseInt(streamEntity.getDataSeparator())); + COSSourceDTO cosSourceDTO = JsonUtils.parseObject(extParams, COSSourceDTO.class); + if (Objects.nonNull(cosSourceDTO)) { + cosSourceDTO.setDataSeparator(dataSeparator); + dataConfig.setAuditVersion(cosSourceDTO.getAuditVersion()); + cosSourceDTO.setContentStyle(streamEntity.getDataType()); + extParams = JsonUtils.toJsonString(cosSourceDTO); + } + return extParams; + } + } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperator.java index faf797dc7a..90a0212aa1 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperator.java @@ -17,10 +17,13 @@ package org.apache.inlong.manager.service.source.file; +import org.apache.inlong.common.pojo.agent.DataConfig; import org.apache.inlong.manager.common.consts.SourceType; import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.exceptions.BusinessException; import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.common.util.JsonUtils; +import org.apache.inlong.manager.dao.entity.InlongStreamEntity; import org.apache.inlong.manager.dao.entity.StreamSourceEntity; import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper; import org.apache.inlong.manager.pojo.source.DataAddTaskDTO; @@ -43,6 +46,7 @@ import org.springframework.transaction.annotation.Isolation; import org.springframework.transaction.annotation.Transactional; import java.util.List; +import java.util.Objects; import java.util.stream.Collectors; /** @@ -140,4 +144,17 @@ public class FileSourceOperator extends AbstractSourceOperator { } } + @Override + public String updateDataConfig(String extParams, InlongStreamEntity streamEntity, DataConfig dataConfig) { + String dataSeparator = String.valueOf((char) Integer.parseInt(streamEntity.getDataSeparator())); + FileSourceDTO fileSourceDTO = JsonUtils.parseObject(extParams, FileSourceDTO.class); + if (Objects.nonNull(fileSourceDTO)) { + fileSourceDTO.setDataSeparator(dataSeparator); + dataConfig.setAuditVersion(fileSourceDTO.getAuditVersion()); + fileSourceDTO.setDataContentStyle(streamEntity.getDataType()); + extParams = JsonUtils.toJsonString(fileSourceDTO); + } + return extParams; + } + } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/cos/COSSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/sql/SqlSourceOperator.java similarity index 69% copy from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/cos/COSSourceOperator.java copy to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/sql/SqlSourceOperator.java index f99a6321bd..20425fad84 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/cos/COSSourceOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/sql/SqlSourceOperator.java @@ -15,25 +15,28 @@ * limitations under the License. */ -package org.apache.inlong.manager.service.source.cos; +package org.apache.inlong.manager.service.source.sql; +import org.apache.inlong.common.pojo.agent.DataConfig; import org.apache.inlong.manager.common.consts.DataNodeType; import org.apache.inlong.manager.common.consts.SourceType; import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.exceptions.BusinessException; import org.apache.inlong.manager.common.util.CommonBeanUtils; import org.apache.inlong.manager.common.util.JsonUtils; +import org.apache.inlong.manager.dao.entity.InlongStreamEntity; import org.apache.inlong.manager.dao.entity.StreamSourceEntity; +import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper; import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper; -import org.apache.inlong.manager.pojo.node.cos.COSDataNodeInfo; +import org.apache.inlong.manager.pojo.node.sql.SqlDataNodeInfo; import org.apache.inlong.manager.pojo.source.DataAddTaskDTO; import org.apache.inlong.manager.pojo.source.DataAddTaskRequest; import org.apache.inlong.manager.pojo.source.SourceRequest; import org.apache.inlong.manager.pojo.source.StreamSource; -import org.apache.inlong.manager.pojo.source.cos.COSDataAddTaskRequest; -import org.apache.inlong.manager.pojo.source.cos.COSSource; -import org.apache.inlong.manager.pojo.source.cos.COSSourceDTO; -import org.apache.inlong.manager.pojo.source.cos.COSSourceRequest; +import org.apache.inlong.manager.pojo.source.sql.SqlDataAddTaskRequest; +import org.apache.inlong.manager.pojo.source.sql.SqlSource; +import org.apache.inlong.manager.pojo.source.sql.SqlSourceDTO; +import org.apache.inlong.manager.pojo.source.sql.SqlSourceRequest; import org.apache.inlong.manager.pojo.stream.StreamField; import org.apache.inlong.manager.service.source.AbstractSourceOperator; @@ -51,50 +54,52 @@ import java.util.Objects; import java.util.stream.Collectors; /** - * COS source operator, such as get or set COS source info. + * Sql source operator, such as get or set Sql source info. */ @Service -public class COSSourceOperator extends AbstractSourceOperator { +public class SqlSourceOperator extends AbstractSourceOperator { - private static final Logger LOGGER = LoggerFactory.getLogger(COSSourceOperator.class); + private static final Logger LOGGER = LoggerFactory.getLogger(SqlSourceOperator.class); @Autowired private ObjectMapper objectMapper; @Autowired private StreamSourceEntityMapper sourceMapper; + @Autowired + private InlongStreamEntityMapper streamEntityMapper; @Override public Boolean accept(String sourceType) { - return SourceType.COS.equals(sourceType); + return SourceType.SQL.equals(sourceType); } @Override protected String getSourceType() { - return SourceType.COS; + return SourceType.SQL; } @Override protected void setTargetEntity(SourceRequest request, StreamSourceEntity targetEntity) { - COSSourceRequest sourceRequest = (COSSourceRequest) request; + SqlSourceRequest sourceRequest = (SqlSourceRequest) request; try { CommonBeanUtils.copyProperties(sourceRequest, targetEntity, true); - COSSourceDTO dto = COSSourceDTO.getFromRequest(sourceRequest, targetEntity.getExtParams()); + SqlSourceDTO dto = SqlSourceDTO.getFromRequest(sourceRequest, targetEntity.getExtParams()); targetEntity.setExtParams(objectMapper.writeValueAsString(dto)); } catch (Exception e) { throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT, - String.format("serialize extParams of COS SourceDTO failure: %s", e.getMessage())); + String.format("serialize extParams of Sql SourceDTO failure: %s", e.getMessage())); } } @Override public StreamSource getFromEntity(StreamSourceEntity entity) { - COSSource source = new COSSource(); + SqlSource source = new SqlSource(); if (entity == null) { return source; } - COSSourceDTO dto = COSSourceDTO.getFromJson(entity.getExtParams()); + SqlSourceDTO dto = SqlSourceDTO.getFromJson(entity.getExtParams()); CommonBeanUtils.copyProperties(entity, source, true); CommonBeanUtils.copyProperties(dto, source, true); @@ -113,13 +118,15 @@ public class COSSourceOperator extends AbstractSourceOperator { @Override public String getExtParams(StreamSourceEntity sourceEntity) { - COSSourceDTO cosSourceDTO = COSSourceDTO.getFromJson(sourceEntity.getExtParams()); - if (Objects.nonNull(cosSourceDTO) && StringUtils.isNotBlank(sourceEntity.getDataNodeName())) { - COSDataNodeInfo dataNodeInfo = - (COSDataNodeInfo) dataNodeService.getByKeyWithoutTenant(sourceEntity.getDataNodeName(), - DataNodeType.COS); - CommonBeanUtils.copyProperties(dataNodeInfo, cosSourceDTO, true); - return JsonUtils.toJsonString(cosSourceDTO); + SqlSourceDTO sqlSourceDTO = SqlSourceDTO.getFromJson(sourceEntity.getExtParams()); + if (Objects.nonNull(sqlSourceDTO) && StringUtils.isNotBlank(sourceEntity.getDataNodeName())) { + SqlDataNodeInfo dataNodeInfo = + (SqlDataNodeInfo) dataNodeService.getByKeyWithoutTenant(sourceEntity.getDataNodeName(), + DataNodeType.SQL); + sqlSourceDTO.setJdbcUrl(dataNodeInfo.getUrl()); + sqlSourceDTO.setUsername(dataNodeInfo.getUsername()); + sqlSourceDTO.setJdbcPassword(dataNodeInfo.getToken()); + return JsonUtils.toJsonString(sqlSourceDTO); } return sourceEntity.getExtParams(); } @@ -128,16 +135,15 @@ public class COSSourceOperator extends AbstractSourceOperator { @Transactional(rollbackFor = Throwable.class, isolation = Isolation.REPEATABLE_READ) public Integer addDataAddTask(DataAddTaskRequest request, String operator) { try { - COSDataAddTaskRequest sourceRequest = (COSDataAddTaskRequest) request; + SqlDataAddTaskRequest sourceRequest = (SqlDataAddTaskRequest) request; StreamSourceEntity sourceEntity = sourceMapper.selectById(request.getSourceId()); - COSSourceDTO dto = COSSourceDTO.getFromJson(sourceEntity.getExtParams()); + SqlSourceDTO dto = SqlSourceDTO.getFromJson(sourceEntity.getExtParams()); dto.setDataTimeFrom(sourceRequest.getDataTimeFrom()); dto.setDataTimeTo(sourceRequest.getDataTimeTo()); dto.setRetry(true); if (request.getIncreaseAuditVersion()) { dto.setAuditVersion(request.getAuditVersion()); } - dto.setFilterStreams(sourceRequest.getFilterStreams()); StreamSourceEntity dataAddTaskEntity = CommonBeanUtils.copyProperties(sourceEntity, StreamSourceEntity::new); dataAddTaskEntity.setId(null); @@ -151,10 +157,21 @@ public class COSSourceOperator extends AbstractSourceOperator { updateAgentTaskConfig(dataAddTaskRequest, operator); return id; } catch (Exception e) { - LOGGER.error("serialize extParams of COS SourceDTO failure: ", e); + LOGGER.error("serialize extParams of Sql SourceDTO failure: ", e); throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT, - String.format("serialize extParams of COS SourceDTO failure: %s", e.getMessage())); + String.format("serialize extParams of Sql SourceDTO failure: %s", e.getMessage())); } } + @Override + public String updateDataConfig(String extParams, InlongStreamEntity streamEntity, DataConfig dataConfig) { + String dataSeparator = String.valueOf((char) Integer.parseInt(streamEntity.getDataSeparator())); + SqlSourceDTO sqlSourceDTO = JsonUtils.parseObject(extParams, SqlSourceDTO.class); + if (Objects.nonNull(sqlSourceDTO)) { + sqlSourceDTO.setDataSeparator(dataSeparator); + dataConfig.setAuditVersion(sqlSourceDTO.getAuditVersion()); + extParams = JsonUtils.toJsonString(sqlSourceDTO); + } + return extParams; + } }