This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch branch-1.5 in repository https://gitbox.apache.org/repos/asf/inlong.git
commit aa6c0809927ad60e56f875158fa58ca61cb1b595 Author: haifxu <xhf1208357...@gmail.com> AuthorDate: Tue Jan 3 18:58:25 2023 +0800 [INLONG-7130][Manager] Fix null JDBC URL when mysql stream sink init (#7132) --- .../resource/sink/mysql/MySQLResourceOperator.java | 24 +++++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/mysql/MySQLResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/mysql/MySQLResourceOperator.java index 6c859cf2a..fb834e3fa 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/mysql/MySQLResourceOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/mysql/MySQLResourceOperator.java @@ -18,16 +18,21 @@ package org.apache.inlong.manager.service.resource.sink.mysql; import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.inlong.manager.common.consts.InlongConstants; import org.apache.inlong.manager.common.consts.SinkType; import org.apache.inlong.manager.common.enums.SinkStatus; import org.apache.inlong.manager.common.exceptions.WorkflowException; +import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.common.util.Preconditions; import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity; import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper; +import org.apache.inlong.manager.pojo.node.DataNodeInfo; import org.apache.inlong.manager.pojo.sink.SinkInfo; import org.apache.inlong.manager.pojo.sink.mysql.MySQLColumnInfo; import org.apache.inlong.manager.pojo.sink.mysql.MySQLSinkDTO; import org.apache.inlong.manager.pojo.sink.mysql.MySQLTableInfo; +import org.apache.inlong.manager.service.node.DataNodeOperateHelper; import org.apache.inlong.manager.service.resource.sink.SinkResourceOperator; import org.apache.inlong.manager.service.sink.StreamSinkService; import org.slf4j.Logger; @@ -53,6 +58,9 @@ public class MySQLResourceOperator implements SinkResourceOperator { @Autowired private StreamSinkFieldEntityMapper fieldEntityMapper; + @Autowired + private DataNodeOperateHelper dataNodeHelper; + @Override public Boolean accept(String sinkType) { return SinkType.MYSQL.equals(sinkType); @@ -90,7 +98,7 @@ public class MySQLResourceOperator implements SinkResourceOperator { columnList.add(columnInfo); } - MySQLSinkDTO sinkDTO = MySQLSinkDTO.getFromJson(sinkInfo.getExtParams()); + MySQLSinkDTO sinkDTO = this.getMysqlInfo(sinkInfo); MySQLTableInfo tableInfo = MySQLSinkDTO.getTableInfo(sinkDTO, columnList); try (Connection conn = MySQLJdbcUtils.getConnection(sinkDTO.getJdbcUrl(), sinkDTO.getUsername(), sinkDTO.getPassword())) { @@ -114,4 +122,18 @@ public class MySQLResourceOperator implements SinkResourceOperator { LOG.info("success create MySQL table for data sink [" + sinkInfo.getId() + "]"); } + private MySQLSinkDTO getMysqlInfo(SinkInfo sinkInfo) { + MySQLSinkDTO mysqlInfo = MySQLSinkDTO.getFromJson(sinkInfo.getExtParams()); + + if (StringUtils.isBlank(mysqlInfo.getJdbcUrl())) { + String dataNodeName = sinkInfo.getDataNodeName(); + Preconditions.checkNotEmpty(dataNodeName, "mysql jdbc url not specified and data node is empty"); + DataNodeInfo dataNodeInfo = dataNodeHelper.getDataNodeInfo(dataNodeName, sinkInfo.getSinkType()); + CommonBeanUtils.copyProperties(dataNodeInfo, mysqlInfo); + mysqlInfo.setJdbcUrl(dataNodeInfo.getUrl()); + mysqlInfo.setPassword(dataNodeInfo.getToken()); + } + return mysqlInfo; + } + } \ No newline at end of file