This is an automated email from the ASF dual-hosted git repository. aloyszhang pushed a commit to branch dev-offline-sync in repository https://gitbox.apache.org/repos/asf/inlong.git
commit 2674a5645c1f0f0e821ec26efb1351c4297dd056 Author: AloysZhang <aloyszh...@apache.org> AuthorDate: Wed Mar 13 16:18:04 2024 +0800 [INLONG-9813][Manager] Support offline data sync management (#9814) --- .../apache/inlong/manager/plugin/listener/StartupSortListener.java | 3 ++- .../manager/service/listener/group/InitGroupCompleteListener.java | 3 ++- .../manager/service/listener/group/UpdateGroupCompleteListener.java | 3 ++- .../manager/service/listener/queue/QueueResourceListener.java | 6 ++++-- .../manager/service/listener/stream/InitStreamCompleteListener.java | 3 ++- .../inlong/manager/service/source/AbstractSourceOperator.java | 1 + .../inlong/manager/service/source/StreamSourceServiceImpl.java | 3 ++- 7 files changed, 15 insertions(+), 7 deletions(-) diff --git a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java index 9997c2abd2..a84d3f5868 100644 --- a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java +++ b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java @@ -61,7 +61,8 @@ public class StartupSortListener implements SortOperateListener { } log.info("add startup group listener for groupId [{}]", groupId); - return InlongConstants.DATASYNC_REALTIME_MODE.equals(groupProcessForm.getGroupInfo().getInlongGroupMode()); + return (InlongConstants.DATASYNC_REALTIME_MODE.equals(groupProcessForm.getGroupInfo().getInlongGroupMode()) + || InlongConstants.DATASYNC_OFFLINE_MODE.equals(groupProcessForm.getGroupInfo().getInlongGroupMode())); } @Override diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/InitGroupCompleteListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/InitGroupCompleteListener.java index c96c12c9ff..c8c087320f 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/InitGroupCompleteListener.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/InitGroupCompleteListener.java @@ -98,7 +98,8 @@ public class InitGroupCompleteListener implements ProcessEventListener { // update status of other related configs if (InlongConstants.DISABLE_CREATE_RESOURCE.equals(groupInfo.getEnableCreateResource())) { - if (InlongConstants.DATASYNC_REALTIME_MODE.equals(groupInfo.getInlongGroupMode())) { + if (InlongConstants.DATASYNC_REALTIME_MODE.equals(groupInfo.getInlongGroupMode()) + || InlongConstants.DATASYNC_OFFLINE_MODE.equals(groupInfo.getInlongGroupMode())) { sourceService.updateStatus(groupId, null, SourceStatus.SOURCE_NORMAL.getCode(), operator); } else { sourceService.updateStatus(groupId, null, SourceStatus.TO_BE_ISSUED_ADD.getCode(), operator); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/UpdateGroupCompleteListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/UpdateGroupCompleteListener.java index a6dd8fc786..1a6e1f7fe1 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/UpdateGroupCompleteListener.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/UpdateGroupCompleteListener.java @@ -96,7 +96,8 @@ public class UpdateGroupCompleteListener implements ProcessEventListener { } // if the inlong group is dataSync mode, the stream source needs to be processed. - if (InlongConstants.DATASYNC_REALTIME_MODE.equals(groupInfo.getInlongGroupMode())) { + if (InlongConstants.DATASYNC_REALTIME_MODE.equals(groupInfo.getInlongGroupMode()) + || InlongConstants.DATASYNC_OFFLINE_MODE.equals(groupInfo.getInlongGroupMode())) { changeSource4DataSync(groupId, operateType, operator); } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java index f566c29728..0ce551d1e6 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java @@ -115,8 +115,10 @@ public class QueueResourceListener implements QueueOperateListener { String operator = context.getOperator(); GroupOperateType operateType = groupProcessForm.getGroupOperateType(); - if (InlongConstants.DATASYNC_REALTIME_MODE.equals(groupInfo.getInlongGroupMode())) { - log.warn("skip to execute QueueResourceListener as sync mode for groupId={}", groupId); + if (InlongConstants.DATASYNC_REALTIME_MODE.equals(groupInfo.getInlongGroupMode()) + || InlongConstants.DATASYNC_OFFLINE_MODE.equals(groupInfo.getInlongGroupMode())) { + log.warn("skip to execute QueueResourceListener as sync mode {} (1 for realtime sync, 2 for offline sync) " + + "for groupId={}", groupInfo.getInlongGroupMode(), groupId); if (GroupOperateType.INIT.equals(operateType)) { this.createQueueForStreams(groupInfo, groupProcessForm.getStreamInfos(), operator); } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/stream/InitStreamCompleteListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/stream/InitStreamCompleteListener.java index df41823b84..85033b44f7 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/stream/InitStreamCompleteListener.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/stream/InitStreamCompleteListener.java @@ -66,7 +66,8 @@ public class InitStreamCompleteListener implements ProcessEventListener { // Update status of other related configs streamService.updateStatus(groupId, streamId, StreamStatus.CONFIG_SUCCESSFUL.getCode(), operator); streamService.updateWithoutCheck(streamInfo.genRequest(), operator); - if (InlongConstants.DATASYNC_REALTIME_MODE.equals(form.getGroupInfo().getInlongGroupMode())) { + if (InlongConstants.DATASYNC_REALTIME_MODE.equals(form.getGroupInfo().getInlongGroupMode()) + || InlongConstants.DATASYNC_OFFLINE_MODE.equals(form.getGroupInfo().getInlongGroupMode())) { sourceService.updateStatus(groupId, streamId, SourceStatus.SOURCE_NORMAL.getCode(), operator); } else { sourceService.updateStatus(groupId, streamId, SourceStatus.TO_BE_ISSUED_ADD.getCode(), operator); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java index 83b7ac3ea9..39d3845209 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java @@ -139,6 +139,7 @@ public abstract class AbstractSourceOperator implements StreamSourceOperator { return; } boolean allowUpdate = InlongConstants.DATASYNC_REALTIME_MODE.equals(groupMode) + || InlongConstants.DATASYNC_OFFLINE_MODE.equals(groupMode) || SourceStatus.ALLOWED_UPDATE.contains(entity.getStatus()); if (!allowUpdate) { throw new BusinessException(ErrorCodeEnum.SOURCE_OPT_NOT_ALLOWED, diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java index a610fb6214..34cc62a250 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java @@ -248,7 +248,8 @@ public class StreamSourceServiceImpl implements StreamSourceService { // if the group mode is DATASYNC, just get all related stream sources List<StreamSource> streamSources = this.listSource(groupId, null); - if (InlongConstants.DATASYNC_REALTIME_MODE.equals(groupInfo.getInlongGroupMode())) { + if (InlongConstants.DATASYNC_REALTIME_MODE.equals(groupInfo.getInlongGroupMode()) + || InlongConstants.DATASYNC_OFFLINE_MODE.equals(groupInfo.getInlongGroupMode())) { result = streamSources.stream() .collect(Collectors.groupingBy(StreamSource::getInlongStreamId, HashMap::new, Collectors.toCollection(ArrayList::new)));