This is an automated email from the ASF dual-hosted git repository. aloyszhang pushed a commit to branch dev-offline-sync in repository https://gitbox.apache.org/repos/asf/inlong.git
commit 06a84650c7c5eceb86183f35b24a9486c9a5ca60 Author: AloysZhang <aloyszh...@apache.org> AuthorDate: Wed Mar 13 16:18:04 2024 +0800 [INLONG-9813][Manager] Support offline data sync management (#9814) --- .../inlong/manager/plugin/listener/StartupSortListener.java | 13 +++++++++---- .../service/listener/group/InitGroupCompleteListener.java | 3 ++- .../service/listener/group/UpdateGroupCompleteListener.java | 3 ++- .../service/listener/queue/QueueResourceListener.java | 6 ++++-- .../service/listener/stream/InitStreamCompleteListener.java | 3 ++- .../manager/service/source/AbstractSourceOperator.java | 1 + .../manager/service/source/StreamSourceServiceImpl.java | 3 ++- 7 files changed, 22 insertions(+), 10 deletions(-) diff --git a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java index ab9e9b55d8..fae5faa278 100644 --- a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java +++ b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java @@ -73,7 +73,8 @@ public class StartupSortListener implements SortOperateListener { } log.info("add startup group listener for groupId [{}]", groupId); - return InlongConstants.DATASYNC_REALTIME_MODE.equals(groupProcessForm.getGroupInfo().getInlongGroupMode()); + return (InlongConstants.DATASYNC_REALTIME_MODE.equals(groupProcessForm.getGroupInfo().getInlongGroupMode()) + || InlongConstants.DATASYNC_OFFLINE_MODE.equals(groupProcessForm.getGroupInfo().getInlongGroupMode())); } @Override @@ -141,9 +142,13 @@ public class StartupSortListener implements SortOperateListener { FlinkOperation flinkOperation = FlinkOperation.getInstance(); try { flinkOperation.genPath(flinkInfo, dataflow); - flinkOperation.start(flinkInfo); - log.info("job submit success for groupId = {}, streamId = {}, jobId = {}", groupId, - streamInfo.getInlongStreamId(), flinkInfo.getJobId()); + // only start job for real-time mode + if (InlongConstants.DATASYNC_REALTIME_MODE + .equals(groupResourceForm.getGroupInfo().getInlongGroupMode())) { + flinkOperation.start(flinkInfo); + log.info("job submit success for groupId = {}, streamId = {}, jobId = {}", groupId, + streamInfo.getInlongStreamId(), flinkInfo.getJobId()); + } } catch (Exception e) { flinkInfo.setException(true); flinkInfo.setExceptionMsg(getExceptionStackMsg(e)); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/InitGroupCompleteListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/InitGroupCompleteListener.java index c96c12c9ff..c8c087320f 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/InitGroupCompleteListener.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/InitGroupCompleteListener.java @@ -98,7 +98,8 @@ public class InitGroupCompleteListener implements ProcessEventListener { // update status of other related configs if (InlongConstants.DISABLE_CREATE_RESOURCE.equals(groupInfo.getEnableCreateResource())) { - if (InlongConstants.DATASYNC_REALTIME_MODE.equals(groupInfo.getInlongGroupMode())) { + if (InlongConstants.DATASYNC_REALTIME_MODE.equals(groupInfo.getInlongGroupMode()) + || InlongConstants.DATASYNC_OFFLINE_MODE.equals(groupInfo.getInlongGroupMode())) { sourceService.updateStatus(groupId, null, SourceStatus.SOURCE_NORMAL.getCode(), operator); } else { sourceService.updateStatus(groupId, null, SourceStatus.TO_BE_ISSUED_ADD.getCode(), operator); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/UpdateGroupCompleteListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/UpdateGroupCompleteListener.java index a6dd8fc786..1a6e1f7fe1 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/UpdateGroupCompleteListener.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/UpdateGroupCompleteListener.java @@ -96,7 +96,8 @@ public class UpdateGroupCompleteListener implements ProcessEventListener { } // if the inlong group is dataSync mode, the stream source needs to be processed. - if (InlongConstants.DATASYNC_REALTIME_MODE.equals(groupInfo.getInlongGroupMode())) { + if (InlongConstants.DATASYNC_REALTIME_MODE.equals(groupInfo.getInlongGroupMode()) + || InlongConstants.DATASYNC_OFFLINE_MODE.equals(groupInfo.getInlongGroupMode())) { changeSource4DataSync(groupId, operateType, operator); } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java index f566c29728..0ce551d1e6 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java @@ -115,8 +115,10 @@ public class QueueResourceListener implements QueueOperateListener { String operator = context.getOperator(); GroupOperateType operateType = groupProcessForm.getGroupOperateType(); - if (InlongConstants.DATASYNC_REALTIME_MODE.equals(groupInfo.getInlongGroupMode())) { - log.warn("skip to execute QueueResourceListener as sync mode for groupId={}", groupId); + if (InlongConstants.DATASYNC_REALTIME_MODE.equals(groupInfo.getInlongGroupMode()) + || InlongConstants.DATASYNC_OFFLINE_MODE.equals(groupInfo.getInlongGroupMode())) { + log.warn("skip to execute QueueResourceListener as sync mode {} (1 for realtime sync, 2 for offline sync) " + + "for groupId={}", groupInfo.getInlongGroupMode(), groupId); if (GroupOperateType.INIT.equals(operateType)) { this.createQueueForStreams(groupInfo, groupProcessForm.getStreamInfos(), operator); } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/stream/InitStreamCompleteListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/stream/InitStreamCompleteListener.java index df41823b84..85033b44f7 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/stream/InitStreamCompleteListener.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/stream/InitStreamCompleteListener.java @@ -66,7 +66,8 @@ public class InitStreamCompleteListener implements ProcessEventListener { // Update status of other related configs streamService.updateStatus(groupId, streamId, StreamStatus.CONFIG_SUCCESSFUL.getCode(), operator); streamService.updateWithoutCheck(streamInfo.genRequest(), operator); - if (InlongConstants.DATASYNC_REALTIME_MODE.equals(form.getGroupInfo().getInlongGroupMode())) { + if (InlongConstants.DATASYNC_REALTIME_MODE.equals(form.getGroupInfo().getInlongGroupMode()) + || InlongConstants.DATASYNC_OFFLINE_MODE.equals(form.getGroupInfo().getInlongGroupMode())) { sourceService.updateStatus(groupId, streamId, SourceStatus.SOURCE_NORMAL.getCode(), operator); } else { sourceService.updateStatus(groupId, streamId, SourceStatus.TO_BE_ISSUED_ADD.getCode(), operator); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java index 83b7ac3ea9..39d3845209 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java @@ -139,6 +139,7 @@ public abstract class AbstractSourceOperator implements StreamSourceOperator { return; } boolean allowUpdate = InlongConstants.DATASYNC_REALTIME_MODE.equals(groupMode) + || InlongConstants.DATASYNC_OFFLINE_MODE.equals(groupMode) || SourceStatus.ALLOWED_UPDATE.contains(entity.getStatus()); if (!allowUpdate) { throw new BusinessException(ErrorCodeEnum.SOURCE_OPT_NOT_ALLOWED, diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java index 3dd8fa5f26..0bb15d6cad 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java @@ -248,7 +248,8 @@ public class StreamSourceServiceImpl implements StreamSourceService { // if the group mode is DATASYNC, just get all related stream sources List<StreamSource> streamSources = this.listSource(groupId, null); - if (InlongConstants.DATASYNC_REALTIME_MODE.equals(groupInfo.getInlongGroupMode())) { + if (InlongConstants.DATASYNC_REALTIME_MODE.equals(groupInfo.getInlongGroupMode()) + || InlongConstants.DATASYNC_OFFLINE_MODE.equals(groupInfo.getInlongGroupMode())) { result = streamSources.stream() .collect(Collectors.groupingBy(StreamSource::getInlongStreamId, HashMap::new, Collectors.toCollection(ArrayList::new)));