This is an automated email from the ASF dual-hosted git repository. aloyszhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 6ce73d2c10 [INLONG-11307][Manager] Fix the problem of unable to obtain extparams from multiple data sources (#11308) 6ce73d2c10 is described below commit 6ce73d2c10b71c72928780106e670e80fcc7662e Author: fuweng11 <76141879+fuwen...@users.noreply.github.com> AuthorDate: Wed Oct 9 12:43:33 2024 +0800 [INLONG-11307][Manager] Fix the problem of unable to obtain extparams from multiple data sources (#11308) --- .../inlong/manager/service/source/AbstractSourceOperator.java | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java index 7c4c0c4882..5caf6d2473 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java @@ -74,6 +74,7 @@ import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.config.AutowireCapableBeanFactory; import org.springframework.transaction.annotation.Isolation; import org.springframework.transaction.annotation.Transactional; @@ -119,6 +120,9 @@ public abstract class AbstractSourceOperator implements StreamSourceOperator { private InlongStreamEntityMapper streamMapper; @Autowired private ObjectMapper objectMapper; + @Autowired + private AutowireCapableBeanFactory autowireCapableBeanFactory; + private SourceOperatorFactory operatorFactory; /** * Getting the source type. @@ -523,7 +527,12 @@ public abstract class AbstractSourceOperator implements StreamSourceOperator { InlongGroupEntity groupEntity = groupMapper.selectByGroupIdWithoutTenant(groupId); InlongStreamEntity streamEntity = streamMapper.selectByIdentifier(groupId, streamId); - String extParams = getExtParams(entity); + if (operatorFactory == null) { + operatorFactory = new SourceOperatorFactory(); + autowireCapableBeanFactory.autowireBean(operatorFactory); + } + StreamSourceOperator sourceOperator = operatorFactory.getInstance(entity.getSourceType()); + String extParams = sourceOperator.getExtParams(entity); if (groupEntity != null && streamEntity != null) { dataConfig.setState( SourceStatus.NORMAL_STATUS_SET.contains(SourceStatus.forCode(entity.getStatus()))