This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 0c79889b4 [INLONG-5799][Manager] Fix the source delete/stop/restart bug (#5807) 0c79889b4 is described below commit 0c79889b4fa1262a111f5d66bcef7468bcbd602e Author: woofyzhao <490467...@qq.com> AuthorDate: Wed Sep 7 13:57:54 2022 +0800 [INLONG-5799][Manager] Fix the source delete/stop/restart bug (#5807) Co-authored-by: healchow <healc...@gmail.com> --- .../service/listener/source/AbstractSourceOperateListener.java | 5 +++++ .../inlong/manager/service/source/StreamSourceServiceImpl.java | 3 --- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/AbstractSourceOperateListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/AbstractSourceOperateListener.java index 7f1097e18..c07c86905 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/AbstractSourceOperateListener.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/AbstractSourceOperateListener.java @@ -99,6 +99,11 @@ public abstract class AbstractSourceOperateListener implements SourceOperateList */ @SneakyThrows public boolean checkIfOp(StreamSource streamSource, List<StreamSource> unOperatedSources) { + // if a source has sub-sources, it is considered a template source. + // template sources do not need to be operated, its sub-sources will be processed in this method later. + if (CollectionUtils.isNotEmpty(streamSource.getSubSourceList())) { + return false; + } for (int retry = 0; retry < 60; retry++) { int status = streamSource.getStatus(); SourceStatus sourceStatus = SourceStatus.forCode(status); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java index 3f14d75a8..7ef31f223 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java @@ -237,7 +237,6 @@ public class StreamSourceServiceImpl implements StreamSourceService { StreamSourceEntity entity = sourceMapper.selectByIdForUpdate(id); Preconditions.checkNotNull(entity, ErrorCodeEnum.SOURCE_INFO_NOT_FOUND.getMessage()); - groupCheckService.checkGroupStatus(entity.getInlongGroupId(), operator); SourceStatus curStatus = SourceStatus.forCode(entity.getStatus()); SourceStatus nextStatus = SourceStatus.TO_BE_ISSUED_DELETE; @@ -272,7 +271,6 @@ public class StreamSourceServiceImpl implements StreamSourceService { LOGGER.info("begin to restart source by id={}", id); StreamSourceEntity entity = sourceMapper.selectByIdForUpdate(id); Preconditions.checkNotNull(entity, ErrorCodeEnum.SOURCE_INFO_NOT_FOUND.getMessage()); - groupCheckService.checkGroupStatus(entity.getInlongGroupId(), operator); StreamSourceOperator sourceOperator = operatorFactory.getInstance(entity.getSourceType()); SourceRequest sourceRequest = new SourceRequest(); @@ -290,7 +288,6 @@ public class StreamSourceServiceImpl implements StreamSourceService { LOGGER.info("begin to stop source by id={}", id); StreamSourceEntity entity = sourceMapper.selectByIdForUpdate(id); Preconditions.checkNotNull(entity, ErrorCodeEnum.SOURCE_INFO_NOT_FOUND.getMessage()); - groupCheckService.checkGroupStatus(entity.getInlongGroupId(), operator); StreamSourceOperator sourceOperator = operatorFactory.getInstance(entity.getSourceType()); SourceRequest sourceRequest = new SourceRequest();