This is an automated email from the ASF dual-hosted git repository. aloyszhang pushed a commit to branch dev-offline-sync in repository https://gitbox.apache.org/repos/asf/inlong.git
commit 5cdd94d4c9d927342b45570c86523948f115d3b2 Author: AloysZhang <aloyszh...@apache.org> AuthorDate: Wed Mar 13 16:18:04 2024 +0800 [INLONG-9813][Manager] Support offline data sync management (#9814) --- .../inlong/manager/plugin/listener/StartupSortListener.java | 13 +++++++++---- .../inlong/manager/service/core/impl/AuditServiceImpl.java | 6 ++++-- .../service/listener/group/InitGroupCompleteListener.java | 3 ++- .../service/listener/group/UpdateGroupCompleteListener.java | 3 ++- .../listener/group/apply/ApproveApplyProcessListener.java | 1 - .../service/listener/queue/QueueResourceListener.java | 6 ++++-- .../service/listener/stream/InitStreamCompleteListener.java | 3 ++- .../manager/service/source/AbstractSourceOperator.java | 1 + .../manager/service/source/StreamSourceServiceImpl.java | 3 ++- 9 files changed, 26 insertions(+), 13 deletions(-) diff --git a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java index ab9e9b55d8..fae5faa278 100644 --- a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java +++ b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java @@ -73,7 +73,8 @@ public class StartupSortListener implements SortOperateListener { } log.info("add startup group listener for groupId [{}]", groupId); - return InlongConstants.DATASYNC_REALTIME_MODE.equals(groupProcessForm.getGroupInfo().getInlongGroupMode()); + return (InlongConstants.DATASYNC_REALTIME_MODE.equals(groupProcessForm.getGroupInfo().getInlongGroupMode()) + || InlongConstants.DATASYNC_OFFLINE_MODE.equals(groupProcessForm.getGroupInfo().getInlongGroupMode())); } @Override @@ -141,9 +142,13 @@ public class StartupSortListener implements SortOperateListener { FlinkOperation flinkOperation = FlinkOperation.getInstance(); try { flinkOperation.genPath(flinkInfo, dataflow); - flinkOperation.start(flinkInfo); - log.info("job submit success for groupId = {}, streamId = {}, jobId = {}", groupId, - streamInfo.getInlongStreamId(), flinkInfo.getJobId()); + // only start job for real-time mode + if (InlongConstants.DATASYNC_REALTIME_MODE + .equals(groupResourceForm.getGroupInfo().getInlongGroupMode())) { + flinkOperation.start(flinkInfo); + log.info("job submit success for groupId = {}, streamId = {}, jobId = {}", groupId, + streamInfo.getInlongStreamId(), flinkInfo.getJobId()); + } } catch (Exception e) { flinkInfo.setException(true); flinkInfo.setExceptionMsg(getExceptionStackMsg(e)); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java index c0086ada23..217f3b5d25 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java @@ -297,7 +297,8 @@ public class AuditServiceImpl implements AuditService { if (CollectionUtils.isEmpty(request.getAuditIds())) { // properly overwrite audit ids by role and stream config - if (InlongConstants.DATASYNC_REALTIME_MODE.equals(groupEntity.getInlongGroupMode())) { + if (InlongConstants.DATASYNC_REALTIME_MODE.equals(groupEntity.getInlongGroupMode()) + || InlongConstants.DATASYNC_OFFLINE_MODE.equals(groupEntity.getInlongGroupMode())) { auditIdMap.put(getAuditId(sourceNodeType, false), sourceNodeType); request.setAuditIds(getAuditIds(groupId, streamId, sourceNodeType, sinkNodeType)); } else { @@ -462,7 +463,8 @@ public class AuditServiceImpl implements AuditService { } else { auditSet.add(getAuditId(sinkNodeType, true)); InlongGroupEntity inlongGroup = inlongGroupMapper.selectByGroupId(groupId); - if (InlongConstants.DATASYNC_REALTIME_MODE.equals(inlongGroup.getInlongGroupMode())) { + if (InlongConstants.DATASYNC_REALTIME_MODE.equals(inlongGroup.getInlongGroupMode()) + || InlongConstants.DATASYNC_OFFLINE_MODE.equals(inlongGroup.getInlongGroupMode())) { auditSet.add(getAuditId(sourceNodeType, false)); } else { auditSet.add(getAuditId(sinkNodeType, false)); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/InitGroupCompleteListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/InitGroupCompleteListener.java index c96c12c9ff..c8c087320f 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/InitGroupCompleteListener.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/InitGroupCompleteListener.java @@ -98,7 +98,8 @@ public class InitGroupCompleteListener implements ProcessEventListener { // update status of other related configs if (InlongConstants.DISABLE_CREATE_RESOURCE.equals(groupInfo.getEnableCreateResource())) { - if (InlongConstants.DATASYNC_REALTIME_MODE.equals(groupInfo.getInlongGroupMode())) { + if (InlongConstants.DATASYNC_REALTIME_MODE.equals(groupInfo.getInlongGroupMode()) + || InlongConstants.DATASYNC_OFFLINE_MODE.equals(groupInfo.getInlongGroupMode())) { sourceService.updateStatus(groupId, null, SourceStatus.SOURCE_NORMAL.getCode(), operator); } else { sourceService.updateStatus(groupId, null, SourceStatus.TO_BE_ISSUED_ADD.getCode(), operator); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/UpdateGroupCompleteListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/UpdateGroupCompleteListener.java index 55290b948f..79462f78db 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/UpdateGroupCompleteListener.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/UpdateGroupCompleteListener.java @@ -82,7 +82,8 @@ public class UpdateGroupCompleteListener implements ProcessEventListener { } // if the inlong group is dataSync mode, the stream source needs to be processed. - if (InlongConstants.DATASYNC_REALTIME_MODE.equals(groupInfo.getInlongGroupMode())) { + if (InlongConstants.DATASYNC_REALTIME_MODE.equals(groupInfo.getInlongGroupMode()) + || InlongConstants.DATASYNC_OFFLINE_MODE.equals(groupInfo.getInlongGroupMode())) { changeSource4DataSync(groupId, operateType, operator); } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/apply/ApproveApplyProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/apply/ApproveApplyProcessListener.java index 4d285488f6..33ed7f86fd 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/apply/ApproveApplyProcessListener.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/apply/ApproveApplyProcessListener.java @@ -69,7 +69,6 @@ public class ApproveApplyProcessListener implements ProcessEventListener { InlongGroupInfo groupInfo = groupService.get(groupId); GroupResourceProcessForm processForm = new GroupResourceProcessForm(); processForm.setGroupInfo(groupInfo); - String username = context.getOperator(); List<InlongStreamInfo> streamList = streamService.list(groupId); processForm.setStreamInfos(streamList); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java index f566c29728..0ce551d1e6 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java @@ -115,8 +115,10 @@ public class QueueResourceListener implements QueueOperateListener { String operator = context.getOperator(); GroupOperateType operateType = groupProcessForm.getGroupOperateType(); - if (InlongConstants.DATASYNC_REALTIME_MODE.equals(groupInfo.getInlongGroupMode())) { - log.warn("skip to execute QueueResourceListener as sync mode for groupId={}", groupId); + if (InlongConstants.DATASYNC_REALTIME_MODE.equals(groupInfo.getInlongGroupMode()) + || InlongConstants.DATASYNC_OFFLINE_MODE.equals(groupInfo.getInlongGroupMode())) { + log.warn("skip to execute QueueResourceListener as sync mode {} (1 for realtime sync, 2 for offline sync) " + + "for groupId={}", groupInfo.getInlongGroupMode(), groupId); if (GroupOperateType.INIT.equals(operateType)) { this.createQueueForStreams(groupInfo, groupProcessForm.getStreamInfos(), operator); } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/stream/InitStreamCompleteListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/stream/InitStreamCompleteListener.java index df41823b84..85033b44f7 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/stream/InitStreamCompleteListener.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/stream/InitStreamCompleteListener.java @@ -66,7 +66,8 @@ public class InitStreamCompleteListener implements ProcessEventListener { // Update status of other related configs streamService.updateStatus(groupId, streamId, StreamStatus.CONFIG_SUCCESSFUL.getCode(), operator); streamService.updateWithoutCheck(streamInfo.genRequest(), operator); - if (InlongConstants.DATASYNC_REALTIME_MODE.equals(form.getGroupInfo().getInlongGroupMode())) { + if (InlongConstants.DATASYNC_REALTIME_MODE.equals(form.getGroupInfo().getInlongGroupMode()) + || InlongConstants.DATASYNC_OFFLINE_MODE.equals(form.getGroupInfo().getInlongGroupMode())) { sourceService.updateStatus(groupId, streamId, SourceStatus.SOURCE_NORMAL.getCode(), operator); } else { sourceService.updateStatus(groupId, streamId, SourceStatus.TO_BE_ISSUED_ADD.getCode(), operator); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java index 0fd1800ea4..0aacec11fb 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java @@ -138,6 +138,7 @@ public abstract class AbstractSourceOperator implements StreamSourceOperator { return; } boolean allowUpdate = InlongConstants.DATASYNC_REALTIME_MODE.equals(groupMode) + || InlongConstants.DATASYNC_OFFLINE_MODE.equals(groupMode) || SourceStatus.ALLOWED_UPDATE.contains(entity.getStatus()); if (!allowUpdate) { throw new BusinessException(ErrorCodeEnum.SOURCE_OPT_NOT_ALLOWED, diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java index 08015a4086..c3be04be85 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java @@ -222,7 +222,8 @@ public class StreamSourceServiceImpl implements StreamSourceService { // if the group mode is DATASYNC, just get all related stream sources List<StreamSource> streamSources = this.listSource(groupId, null); - if (InlongConstants.DATASYNC_REALTIME_MODE.equals(groupInfo.getInlongGroupMode())) { + if (InlongConstants.DATASYNC_REALTIME_MODE.equals(groupInfo.getInlongGroupMode()) + || InlongConstants.DATASYNC_OFFLINE_MODE.equals(groupInfo.getInlongGroupMode())) { result = streamSources.stream() .collect(Collectors.groupingBy(StreamSource::getInlongStreamId, HashMap::new, Collectors.toCollection(ArrayList::new)));