This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 15da36a0e [INLONG-7151][Manager] Fix failure to create node when init sort (#7152) 15da36a0e is described below commit 15da36a0ef58f1401bd558ee80d7814b890739fa Author: haifxu <xhf1208357...@gmail.com> AuthorDate: Thu Jan 5 09:49:52 2023 +0800 [INLONG-7151][Manager] Fix failure to create node when init sort (#7152) --- .../inlong/manager/service/sink/mysql/MySQLSinkOperator.java | 10 ++++++++++ .../manager/service/source/kafka/KafkaSourceOperator.java | 6 +++++- 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/mysql/MySQLSinkOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/mysql/MySQLSinkOperator.java index cafa79125..a81b9ff34 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/mysql/MySQLSinkOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/mysql/MySQLSinkOperator.java @@ -18,9 +18,11 @@ package org.apache.inlong.manager.service.sink.mysql; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.lang3.StringUtils; import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.consts.SinkType; import org.apache.inlong.manager.common.exceptions.BusinessException; +import org.apache.inlong.manager.pojo.node.DataNodeInfo; import org.apache.inlong.manager.pojo.sink.SinkField; import org.apache.inlong.manager.pojo.sink.SinkRequest; import org.apache.inlong.manager.pojo.sink.StreamSink; @@ -81,6 +83,14 @@ public class MySQLSinkOperator extends AbstractSinkOperator { } MySQLSinkDTO dto = MySQLSinkDTO.getFromJson(entity.getExtParams()); + if (StringUtils.isBlank(dto.getJdbcUrl())) { + String dataNodeName = entity.getDataNodeName(); + Preconditions.checkNotEmpty(dataNodeName, "mysql jdbc url not specified and data node is empty"); + DataNodeInfo dataNodeInfo = dataNodeHelper.getDataNodeInfo(dataNodeName, entity.getSinkType()); + CommonBeanUtils.copyProperties(dataNodeInfo, dto, true); + dto.setJdbcUrl(dataNodeInfo.getUrl()); + dto.setPassword(dataNodeInfo.getToken()); + } CommonBeanUtils.copyProperties(entity, sink, true); CommonBeanUtils.copyProperties(dto, sink, true); List<SinkField> sinkFields = super.getSinkFields(entity.getId()); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java index 6b6dff0cb..686b81c39 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java @@ -20,6 +20,7 @@ package org.apache.inlong.manager.service.source.kafka; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import org.apache.commons.lang3.StringUtils; import org.apache.inlong.common.enums.DataTypeEnum; import org.apache.inlong.manager.common.consts.SourceType; import org.apache.inlong.manager.common.enums.ClusterType; @@ -118,7 +119,10 @@ public class KafkaSourceOperator extends AbstractSourceOperator { if (!Objects.equals(streamId, sourceInfo.getInlongStreamId())) { continue; } - kafkaSource.setSerializationType(sourceInfo.getSerializationType()); + if (StringUtils.isEmpty(kafkaSource.getSerializationType()) && StringUtils.isNotEmpty( + sourceInfo.getSerializationType())) { + kafkaSource.setSerializationType(sourceInfo.getSerializationType()); + } } kafkaSource.setWrapWithInlongMsg(streamInfo.getWrapWithInlongMsg());