This is an automated email from the ASF dual-hosted git repository. aloyszhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new f5950eed5a [INLONG-9921][Manager] Fix the problem of manager can't stop data sync job (#9942) f5950eed5a is described below commit f5950eed5a4f1a0479d2a6e1547b763c83d5c9bf Author: fuweng11 <76141879+fuwen...@users.noreply.github.com> AuthorDate: Tue Apr 9 17:15:32 2024 +0800 [INLONG-9921][Manager] Fix the problem of manager can't stop data sync job (#9942) * [INLONG-9921][Manager] Fix the problem of manager can't stop data sync job --- .../inlong/manager/common/enums/GroupStatus.java | 21 +++++++++++---------- .../manager/common/enums/SimpleGroupStatus.java | 4 ++-- .../inlong/manager/common/enums/StreamStatus.java | 15 +++++++-------- .../manager/service/core/impl/AgentServiceImpl.java | 2 +- .../service/group/InlongGroupProcessService.java | 2 +- .../listener/group/UpdateGroupCompleteListener.java | 16 +++++++++++++++- .../stream/UpdateStreamCompleteListener.java | 4 ++-- .../listener/stream/UpdateStreamListener.java | 4 ++-- .../service/stream/InlongStreamProcessService.java | 8 ++++---- .../manager/service/core/impl/AgentServiceTest.java | 8 +++++--- .../group/InlongGroupProcessServiceTest.java | 2 +- .../source/listener/StreamSourceListenerTest.java | 2 +- 12 files changed, 52 insertions(+), 36 deletions(-) diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupStatus.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupStatus.java index 79e89ad53b..759f5a03b4 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupStatus.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupStatus.java @@ -41,13 +41,13 @@ public enum GroupStatus { CONFIG_FAILED(120, "configuration failed"), CONFIG_SUCCESSFUL(130, "configuration successful"), - CONFIG_OFFLINE_ING(141, "in configure offline"), - CONFIGURATION_OFFLINE(140, "configure offline successful"), + CONFIG_OFFLINE_ING(141, "configuration is going offline"), + CONFIG_OFFLINE_SUCCESSFUL(140, "configuration offline successful"), - CONFIG_ONLINE_ING(151, "in configure online"), + CONFIG_ONLINE_ING(151, "configuration is going online"), - CONFIG_DELETING(41, "configure deleting"), - CONFIG_DELETED(40, "configure deleted"), + CONFIG_DELETING(41, "configuration deleting"), + CONFIG_DELETED(40, "configuration deleted"), // FINISH is used for batch task. FINISH(131, "finish"); @@ -71,9 +71,10 @@ public enum GroupStatus { Sets.newHashSet(CONFIG_SUCCESSFUL, TO_BE_APPROVAL, CONFIG_ING, CONFIG_OFFLINE_ING, CONFIG_DELETING)); GROUP_STATE_AUTOMATON.put( - CONFIG_OFFLINE_ING, Sets.newHashSet(CONFIG_OFFLINE_ING, CONFIGURATION_OFFLINE, CONFIG_FAILED)); - GROUP_STATE_AUTOMATON.put(CONFIGURATION_OFFLINE, Sets.newHashSet(CONFIGURATION_OFFLINE, CONFIG_ONLINE_ING, - CONFIG_DELETING)); + CONFIG_OFFLINE_ING, Sets.newHashSet(CONFIG_OFFLINE_ING, CONFIG_OFFLINE_SUCCESSFUL, CONFIG_FAILED)); + GROUP_STATE_AUTOMATON.put( + CONFIG_OFFLINE_SUCCESSFUL, Sets.newHashSet(CONFIG_OFFLINE_SUCCESSFUL, CONFIG_ONLINE_ING, + CONFIG_DELETING)); GROUP_STATE_AUTOMATON.put(CONFIG_ONLINE_ING, Sets.newHashSet(CONFIG_ONLINE_ING, CONFIG_FAILED, CONFIG_SUCCESSFUL)); @@ -143,7 +144,7 @@ public enum GroupStatus { return status == GroupStatus.APPROVE_PASSED || status == GroupStatus.CONFIG_FAILED || status == GroupStatus.CONFIG_SUCCESSFUL - || status == GroupStatus.CONFIGURATION_OFFLINE + || status == GroupStatus.CONFIG_OFFLINE_SUCCESSFUL || status == GroupStatus.FINISH; } @@ -163,7 +164,7 @@ public enum GroupStatus { */ public static boolean allowedSuspend(GroupStatus status) { return status == GroupStatus.CONFIG_SUCCESSFUL - || status == GroupStatus.CONFIGURATION_OFFLINE + || status == GroupStatus.CONFIG_OFFLINE_SUCCESSFUL || status == GroupStatus.FINISH; } diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SimpleGroupStatus.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SimpleGroupStatus.java index d9f6bdc98d..879902f9d4 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SimpleGroupStatus.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SimpleGroupStatus.java @@ -52,7 +52,7 @@ public enum SimpleGroupStatus { return FAILED; case CONFIG_SUCCESSFUL: return STARTED; - case CONFIGURATION_OFFLINE: + case CONFIG_OFFLINE_SUCCESSFUL: return STOPPED; case FINISH: return FINISHED; @@ -101,7 +101,7 @@ public enum SimpleGroupStatus { statusList.add(GroupStatus.CONFIG_SUCCESSFUL.getCode()); return statusList; case STOPPED: - statusList.add(GroupStatus.CONFIGURATION_OFFLINE.getCode()); + statusList.add(GroupStatus.CONFIG_OFFLINE_SUCCESSFUL.getCode()); return statusList; case FINISHED: statusList.add(GroupStatus.FINISH.getCode()); diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/StreamStatus.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/StreamStatus.java index 383563db34..dad880e37c 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/StreamStatus.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/StreamStatus.java @@ -27,11 +27,10 @@ public enum StreamStatus { CONFIG_FAILED(120, "configuration failed"), CONFIG_SUCCESSFUL(130, "configuration successful"), - SUSPENDING(141, "suspending"), - SUSPENDED(140, "suspended"), + CONFIG_OFFLINE_ING(141, "configuration is going offline"), + CONFIG_OFFLINE_SUCCESSFUL(140, "configuration offline successful"), - RESTARTING(151, "restarting"), - RESTARTED(150, "restarted"), + CONFIG_ONLINE_ING(151, "configuration is going online"), DELETING(41, "deleting"), DELETED(40, "deleted"); @@ -48,8 +47,8 @@ public enum StreamStatus { * Checks whether the given status allows updating operate. */ public static boolean notAllowedUpdate(StreamStatus status) { - return status == StreamStatus.CONFIG_ING || status == StreamStatus.SUSPENDING - || status == StreamStatus.RESTARTING || status == StreamStatus.DELETING; + return status == StreamStatus.CONFIG_ING || status == StreamStatus.CONFIG_OFFLINE_ING + || status == StreamStatus.CONFIG_ONLINE_ING || status == StreamStatus.DELETING; } /** @@ -57,8 +56,8 @@ public enum StreamStatus { */ public static boolean notAllowedDelete(StreamStatus status) { return status == StreamStatus.CONFIG_ING - || status == StreamStatus.RESTARTING - || status == StreamStatus.SUSPENDING; + || status == StreamStatus.CONFIG_ONLINE_ING + || status == StreamStatus.CONFIG_OFFLINE_ING; } public static StreamStatus forCode(int code) { diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java index 682f130747..8d881d81d1 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java @@ -525,7 +525,7 @@ public class AgentServiceImpl implements AgentService { List<StreamSourceEntity> sourceEntities = sourceMapper.selectTemplateSourceByCluster(needCopiedStatusList, Lists.newArrayList(SourceType.FILE), agentClusterName); Set<GroupStatus> noNeedAddTask = Sets.newHashSet( - GroupStatus.CONFIGURATION_OFFLINE, GroupStatus.CONFIG_OFFLINE_ING, GroupStatus.CONFIG_DELETING, + GroupStatus.CONFIG_OFFLINE_SUCCESSFUL, GroupStatus.CONFIG_OFFLINE_ING, GroupStatus.CONFIG_DELETING, GroupStatus.CONFIG_DELETED); sourceEntities.stream() .forEach(sourceEntity -> { diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupProcessService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupProcessService.java index a1460e4ed4..dbb78584b7 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupProcessService.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupProcessService.java @@ -336,7 +336,7 @@ public class InlongGroupProcessService { case CONFIG_ONLINE_ING: return GroupStatus.CONFIG_SUCCESSFUL; case CONFIG_OFFLINE_ING: - return GroupStatus.CONFIGURATION_OFFLINE; + return GroupStatus.CONFIG_OFFLINE_SUCCESSFUL; default: return GroupStatus.CONFIG_DELETED; } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/UpdateGroupCompleteListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/UpdateGroupCompleteListener.java index d21ca83c92..445ee647ef 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/UpdateGroupCompleteListener.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/UpdateGroupCompleteListener.java @@ -22,19 +22,25 @@ import org.apache.inlong.manager.common.enums.GroupOperateType; import org.apache.inlong.manager.common.enums.GroupStatus; import org.apache.inlong.manager.common.enums.ProcessEvent; import org.apache.inlong.manager.common.enums.SourceStatus; +import org.apache.inlong.manager.common.enums.StreamStatus; import org.apache.inlong.manager.pojo.group.InlongGroupInfo; import org.apache.inlong.manager.pojo.group.InlongGroupRequest; +import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; import org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm; import org.apache.inlong.manager.service.group.InlongGroupService; import org.apache.inlong.manager.service.source.StreamSourceService; +import org.apache.inlong.manager.service.stream.InlongStreamService; import org.apache.inlong.manager.workflow.WorkflowContext; import org.apache.inlong.manager.workflow.event.ListenerResult; import org.apache.inlong.manager.workflow.event.process.ProcessEventListener; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections.CollectionUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import java.util.List; + /** * The listener of InlongGroup when update operates successfully. */ @@ -46,6 +52,8 @@ public class UpdateGroupCompleteListener implements ProcessEventListener { private InlongGroupService groupService; @Autowired private StreamSourceService sourceService; + @Autowired + private InlongStreamService streamService; @Override public ProcessEvent event() { @@ -63,12 +71,18 @@ public class UpdateGroupCompleteListener implements ProcessEventListener { InlongGroupInfo groupInfo = form.getGroupInfo(); InlongGroupRequest groupRequest = groupInfo.genRequest(); String operator = context.getOperator(); + List<InlongStreamInfo> streamInfos = form.getStreamInfos(); + if (CollectionUtils.isNotEmpty(streamInfos)) { + streamInfos.forEach(streamInfo -> streamService.updateWithoutCheck(streamInfo.genRequest(), operator)); + } switch (operateType) { case SUSPEND: - groupService.updateStatus(groupId, GroupStatus.CONFIGURATION_OFFLINE.getCode(), operator); + streamService.updateStatus(groupId, null, StreamStatus.CONFIG_OFFLINE_SUCCESSFUL.getCode(), operator); + groupService.updateStatus(groupId, GroupStatus.CONFIG_OFFLINE_SUCCESSFUL.getCode(), operator); groupService.update(groupRequest, operator); break; case RESTART: + streamService.updateStatus(groupId, null, StreamStatus.CONFIG_SUCCESSFUL.getCode(), operator); groupService.updateStatus(groupId, GroupStatus.CONFIG_SUCCESSFUL.getCode(), operator); groupService.update(groupRequest, operator); break; diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/stream/UpdateStreamCompleteListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/stream/UpdateStreamCompleteListener.java index 8d07861374..896feaefb0 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/stream/UpdateStreamCompleteListener.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/stream/UpdateStreamCompleteListener.java @@ -57,10 +57,10 @@ public class UpdateStreamCompleteListener implements ProcessEventListener { StreamStatus status; switch (operateType) { case RESTART: - status = StreamStatus.RESTARTED; + status = StreamStatus.CONFIG_SUCCESSFUL; break; case SUSPEND: - status = StreamStatus.SUSPENDED; + status = StreamStatus.CONFIG_OFFLINE_SUCCESSFUL; break; case DELETE: status = StreamStatus.DELETED; diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/stream/UpdateStreamListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/stream/UpdateStreamListener.java index f5235a3d1b..fd7faaf527 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/stream/UpdateStreamListener.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/stream/UpdateStreamListener.java @@ -54,10 +54,10 @@ public class UpdateStreamListener implements ProcessEventListener { final String streamId = streamInfo.getInlongStreamId(); switch (operateType) { case SUSPEND: - streamService.updateStatus(groupId, streamId, StreamStatus.SUSPENDING.getCode(), operator); + streamService.updateStatus(groupId, streamId, StreamStatus.CONFIG_OFFLINE_ING.getCode(), operator); break; case RESTART: - streamService.updateStatus(groupId, streamId, StreamStatus.RESTARTING.getCode(), operator); + streamService.updateStatus(groupId, streamId, StreamStatus.CONFIG_ONLINE_ING.getCode(), operator); break; case DELETE: streamService.updateStatus(groupId, streamId, StreamStatus.DELETING.getCode(), operator); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamProcessService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamProcessService.java index b37fd7b5b6..59a1390111 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamProcessService.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamProcessService.java @@ -134,12 +134,12 @@ public class InlongStreamProcessService { InlongStreamInfo streamInfo = streamService.get(groupId, streamId); Preconditions.expectNotNull(streamInfo, ErrorCodeEnum.STREAM_NOT_FOUND.getMessage()); StreamStatus status = StreamStatus.forCode(streamInfo.getStatus()); - if (status == StreamStatus.SUSPENDED || status == StreamStatus.SUSPENDING) { + if (status == StreamStatus.CONFIG_OFFLINE_SUCCESSFUL || status == StreamStatus.CONFIG_OFFLINE_ING) { log.warn("groupId={}, streamId={} is already in {}", groupId, streamId, status); return true; } - if (status != StreamStatus.CONFIG_SUCCESSFUL && status != StreamStatus.RESTARTED) { + if (status != StreamStatus.CONFIG_SUCCESSFUL) { throw new BusinessException(String.format("stream status=%s not support suspend stream" + " for groupId=%s streamId=%s", status, groupId, streamId)); } @@ -176,12 +176,12 @@ public class InlongStreamProcessService { InlongStreamInfo streamInfo = streamService.get(groupId, streamId); Preconditions.expectNotNull(streamInfo, ErrorCodeEnum.STREAM_NOT_FOUND.getMessage()); StreamStatus status = StreamStatus.forCode(streamInfo.getStatus()); - if (status == StreamStatus.RESTARTED || status == StreamStatus.RESTARTING) { + if (status == StreamStatus.CONFIG_ONLINE_ING) { log.warn("inlong stream was already in {} for groupId={}, streamId={}", status, groupId, streamId); return true; } - if (status != StreamStatus.SUSPENDED) { + if (status != StreamStatus.CONFIG_OFFLINE_SUCCESSFUL) { throw new BusinessException(String.format("stream status=%s not support restart stream" + " for groupId=%s streamId=%s", status, groupId, streamId)); } diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java index 41acbdfd86..9b267eaa3b 100644 --- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java +++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java @@ -143,8 +143,9 @@ class AgentServiceTest extends ServiceBaseTest { sources.stream() .filter(source -> source.getTaskMapId() != null) .forEach(source -> sourceService.stop(source.getId(), GLOBAL_OPERATOR)); - groupMapper.updateStatus(groupId, GroupStatus.CONFIGURATION_OFFLINE.getCode(), GLOBAL_OPERATOR); - streamMapper.updateStatusByIdentifier(groupId, streamId, StreamStatus.SUSPENDED.getCode(), GLOBAL_OPERATOR); + groupMapper.updateStatus(groupId, GroupStatus.CONFIG_OFFLINE_SUCCESSFUL.getCode(), GLOBAL_OPERATOR); + streamMapper.updateStatusByIdentifier(groupId, streamId, StreamStatus.CONFIG_OFFLINE_SUCCESSFUL.getCode(), + GLOBAL_OPERATOR); } /** @@ -156,7 +157,8 @@ class AgentServiceTest extends ServiceBaseTest { .filter(source -> source.getTaskMapId() != null) .forEach(source -> sourceService.restart(source.getId(), GLOBAL_OPERATOR)); groupMapper.updateStatus(groupId, GroupStatus.CONFIG_SUCCESSFUL.getCode(), GLOBAL_OPERATOR); - streamMapper.updateStatusByIdentifier(groupId, streamId, StreamStatus.RESTARTED.getCode(), GLOBAL_OPERATOR); + streamMapper.updateStatusByIdentifier(groupId, streamId, StreamStatus.CONFIG_SUCCESSFUL.getCode(), + GLOBAL_OPERATOR); } public void deleteSource(String groupId, String streamId) { diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/group/InlongGroupProcessServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/group/InlongGroupProcessServiceTest.java index 96bbef6a7a..5087849289 100644 --- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/group/InlongGroupProcessServiceTest.java +++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/group/InlongGroupProcessServiceTest.java @@ -96,7 +96,7 @@ public class InlongGroupProcessServiceTest extends ServiceBaseTest { ProcessResponse response = result.getProcessInfo(); Assertions.assertSame(response.getStatus(), ProcessStatus.COMPLETED); InlongGroupInfo groupInfo = groupService.get(GROUP_ID); - Assertions.assertEquals(groupInfo.getStatus(), GroupStatus.CONFIGURATION_OFFLINE.getCode()); + Assertions.assertEquals(groupInfo.getStatus(), GroupStatus.CONFIG_OFFLINE_SUCCESSFUL.getCode()); } private void testRestartProcess() { diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/listener/StreamSourceListenerTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/listener/StreamSourceListenerTest.java index 42df6bfc65..99e1420915 100644 --- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/listener/StreamSourceListenerTest.java +++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/listener/StreamSourceListenerTest.java @@ -103,7 +103,7 @@ public class StreamSourceListenerTest extends ServiceBaseTest { private void testRestartSource(Integer sourceId) { groupService.updateStatus(GROUP_ID, GroupStatus.CONFIG_OFFLINE_ING.getCode(), GLOBAL_OPERATOR); groupService.update(groupInfo.genRequest(), GLOBAL_OPERATOR); - groupService.updateStatus(GROUP_ID, GroupStatus.CONFIGURATION_OFFLINE.getCode(), GLOBAL_OPERATOR); + groupService.updateStatus(GROUP_ID, GroupStatus.CONFIG_OFFLINE_SUCCESSFUL.getCode(), GLOBAL_OPERATOR); groupService.update(groupInfo.genRequest(), GLOBAL_OPERATOR); sourceService.updateStatus(GROUP_ID, null, SourceStatus.SOURCE_NORMAL.getCode(), GLOBAL_OPERATOR);