This is an automated email from the ASF dual-hosted git repository. aloyszhang pushed a commit to branch dev-offline-sync in repository https://gitbox.apache.org/repos/asf/inlong.git
commit cfd9136286502a4f33d3482461218f814f4f2a6a Author: AloysZhang <aloyszh...@apache.org> AuthorDate: Thu Mar 7 11:44:20 2024 +0800 [INLONG-9781][Manager] Add offline sync task type definition (#9787) --- .../java/org/apache/inlong/manager/client/ut/BaseTest.java | 2 +- .../inlong/manager/common/consts/InlongConstants.java | 3 ++- .../org/apache/inlong/manager/common/enums/GroupMode.java | 14 +++++++++++--- .../manager/plugin/listener/StartupSortListener.java | 2 +- .../apache/inlong/manager/pojo/group/InlongGroupInfo.java | 3 ++- .../inlong/manager/pojo/group/InlongGroupPageRequest.java | 3 ++- .../inlong/manager/pojo/group/InlongGroupRequest.java | 5 +++-- .../inlong/manager/service/core/impl/AuditServiceImpl.java | 6 ++++-- .../manager/service/group/InlongGroupServiceImpl.java | 2 +- .../service/listener/group/InitGroupCompleteListener.java | 2 +- .../listener/group/UpdateGroupCompleteListener.java | 2 +- .../service/listener/queue/QueueResourceListener.java | 2 +- .../listener/stream/InitStreamCompleteListener.java | 2 +- .../manager/service/source/AbstractSourceOperator.java | 2 +- .../manager/service/source/StreamSourceServiceImpl.java | 2 +- 15 files changed, 33 insertions(+), 19 deletions(-) diff --git a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/ut/BaseTest.java b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/ut/BaseTest.java index 4d218918f5..9961c16ef8 100644 --- a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/ut/BaseTest.java +++ b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/ut/BaseTest.java @@ -110,7 +110,7 @@ public class BaseTest { // set enable zk, create resource, group mode, and cluster tag pulsarInfo.setEnableZookeeper(InlongConstants.DISABLE_ZK); pulsarInfo.setEnableCreateResource(InlongConstants.ENABLE_CREATE_RESOURCE); - pulsarInfo.setInlongGroupMode(InlongConstants.DATASYNC_MODE); + pulsarInfo.setInlongGroupMode(InlongConstants.DATASYNC_REALTIME_MODE); pulsarInfo.setInlongClusterTag("default_cluster"); pulsarInfo.setDailyRecords(10000000); diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java index c3085972fd..581ebb3098 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java @@ -87,7 +87,8 @@ public class InlongConstants { public static final Integer DELETED_STATUS = 10; public static final Integer STANDARD_MODE = 0; - public static final Integer DATASYNC_MODE = 1; + public static final Integer DATASYNC_REALTIME_MODE = 1; + public static final Integer DATASYNC_OFFLINE_MODE = 2; public static final Integer DISABLE_ZK = 0; public static final Integer ENABLE_ZK = 1; diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupMode.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupMode.java index f0db2353b6..d4b417f39a 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupMode.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupMode.java @@ -31,10 +31,18 @@ public enum GroupMode { STANDARD("standard"), /** - * DataSync mode(only Data Synchronization): group init only with sort in InLong Cluster - * StreamSource -> Sort -> StreamSink + * DataSync mode(only Data Synchronization): real-time data sync in stream way, group init only with + * sort in InLong Cluster. + * StreamSource -> Sort -> Sink */ - DATASYNC("datasync"); + DATASYNC("datasync"), + + /** + * DataSync mode(only Data Synchronization): offline data sync in batch way, group init only with sort + * in InLong Cluster. + * BatchSource -> Sort -> Sink + */ + DATASYNC_BATCH("datasync_offline"); @Getter private final String mode; diff --git a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java index 0b0e55e369..ab9e9b55d8 100644 --- a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java +++ b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java @@ -73,7 +73,7 @@ public class StartupSortListener implements SortOperateListener { } log.info("add startup group listener for groupId [{}]", groupId); - return InlongConstants.DATASYNC_MODE.equals(groupProcessForm.getGroupInfo().getInlongGroupMode()); + return InlongConstants.DATASYNC_REALTIME_MODE.equals(groupProcessForm.getGroupInfo().getInlongGroupMode()); } @Override diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupInfo.java index 5b3e730ed9..17b66497a9 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupInfo.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupInfo.java @@ -72,7 +72,8 @@ public abstract class InlongGroupInfo extends BaseInlongGroup { @ApiModelProperty(value = "Whether to enable create resource? 0: disable, 1: enable") private Integer enableCreateResource; - @ApiModelProperty(value = "Standard mode(include Data Ingestion and Synchronization): 0, DataSync mode(only Data Synchronization): 1") + @ApiModelProperty(value = "Standard mode(include Data Ingestion and Synchronization): 0, DataSync mode(only Data Synchronization, real-time data sync in stream way): 1," + + " DataSync mode(only Data Synchronization, offline data sync in batch way): 2") private Integer inlongGroupMode; @ApiModelProperty(value = "Data report type, default is 0.\n" diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupPageRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupPageRequest.java index 5d7c24b295..4dc2140166 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupPageRequest.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupPageRequest.java @@ -58,7 +58,8 @@ public class InlongGroupPageRequest extends PageRequest { @ApiModelProperty(value = "The inlong cluster tag list") private List<String> clusterTagList; - @ApiModelProperty(value = "Standard mode(include Data Ingestion and Synchronization): 0, DataSync mode(only Data Synchronization): 1") + @ApiModelProperty(value = "Standard mode(include Data Ingestion and Synchronization): 0, DataSync mode(only Data Synchronization, real-time data sync in stream way): 1," + + " DataSync mode(only Data Synchronization, offline data sync in batch way): 2") private Integer inlongGroupMode; @ApiModelProperty(value = "Current user", hidden = true) diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupRequest.java index 30893b7a09..6140bddad5 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupRequest.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupRequest.java @@ -86,8 +86,9 @@ public abstract class InlongGroupRequest extends BaseInlongGroup { @Range(min = 0, max = 1, message = "default is 1, only supports [0: disable, 1: enable]") private Integer enableCreateResource; - @ApiModelProperty(value = "Standard mode(include Data Ingestion and Synchronization): 0, DataSync mode(only Data Synchronization): 1") - @Range(min = 0, max = 1, message = "default is 0, only supports [0: Standard, 1: DataSync]") + @ApiModelProperty(value = "Standard mode(include Data Ingestion and Synchronization): 0, DataSync mode(only Data Synchronization, real-time data sync in stream way): 1," + + " DataSync mode(only Data Synchronization, offline data sync in batch way): 2") + @Range(min = 0, max = 2, message = "default is 0, only supports [0: Standard, 1: DataSync, 2: DataSyncOffline]") private Integer inlongGroupMode; @ApiModelProperty(value = "Data report type, default is 0.\n" diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java index 8230771b76..da24563475 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java @@ -209,7 +209,8 @@ public class AuditServiceImpl implements AuditService { if (CollectionUtils.isEmpty(request.getAuditIds())) { // properly overwrite audit ids by role and stream config - if (InlongConstants.DATASYNC_MODE.equals(groupEntity.getInlongGroupMode())) { + if (InlongConstants.DATASYNC_REALTIME_MODE.equals(groupEntity.getInlongGroupMode()) + || InlongConstants.DATASYNC_OFFLINE_MODE.equals(groupEntity.getInlongGroupMode())) { auditIdMap.put(getAuditId(sourceNodeType, IndicatorType.RECEIVED_SUCCESS), sourceNodeType); request.setAuditIds(getAuditIds(groupId, streamId, sourceNodeType, sinkNodeType)); } else { @@ -320,7 +321,8 @@ public class AuditServiceImpl implements AuditService { } else { auditSet.add(getAuditId(sinkNodeType, IndicatorType.SEND_SUCCESS)); InlongGroupEntity inlongGroup = inlongGroupMapper.selectByGroupId(groupId); - if (InlongConstants.DATASYNC_MODE.equals(inlongGroup.getInlongGroupMode())) { + if (InlongConstants.DATASYNC_REALTIME_MODE.equals(inlongGroup.getInlongGroupMode()) + || InlongConstants.DATASYNC_OFFLINE_MODE.equals(inlongGroup.getInlongGroupMode())) { auditSet.add(getAuditId(sourceNodeType, IndicatorType.RECEIVED_SUCCESS)); } else { auditSet.add(getAuditId(sinkNodeType, IndicatorType.RECEIVED_SUCCESS)); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java index 49e55cfbd0..78257b35d9 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java @@ -731,7 +731,7 @@ public class InlongGroupServiceImpl implements InlongGroupService { InlongGroupInfo groupInfo = this.get(groupId); // check if the group mode is data sync mode - if (InlongConstants.DATASYNC_MODE.equals(groupInfo.getInlongGroupMode())) { + if (InlongConstants.DATASYNC_REALTIME_MODE.equals(groupInfo.getInlongGroupMode())) { String errMSg = String.format("no need to switch sync mode group = {}", groupId); LOGGER.error(errMSg); throw new BusinessException(errMSg); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/InitGroupCompleteListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/InitGroupCompleteListener.java index db8cffbbe1..c96c12c9ff 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/InitGroupCompleteListener.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/InitGroupCompleteListener.java @@ -98,7 +98,7 @@ public class InitGroupCompleteListener implements ProcessEventListener { // update status of other related configs if (InlongConstants.DISABLE_CREATE_RESOURCE.equals(groupInfo.getEnableCreateResource())) { - if (InlongConstants.DATASYNC_MODE.equals(groupInfo.getInlongGroupMode())) { + if (InlongConstants.DATASYNC_REALTIME_MODE.equals(groupInfo.getInlongGroupMode())) { sourceService.updateStatus(groupId, null, SourceStatus.SOURCE_NORMAL.getCode(), operator); } else { sourceService.updateStatus(groupId, null, SourceStatus.TO_BE_ISSUED_ADD.getCode(), operator); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/UpdateGroupCompleteListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/UpdateGroupCompleteListener.java index 445ee647ef..a6dd8fc786 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/UpdateGroupCompleteListener.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/UpdateGroupCompleteListener.java @@ -96,7 +96,7 @@ public class UpdateGroupCompleteListener implements ProcessEventListener { } // if the inlong group is dataSync mode, the stream source needs to be processed. - if (InlongConstants.DATASYNC_MODE.equals(groupInfo.getInlongGroupMode())) { + if (InlongConstants.DATASYNC_REALTIME_MODE.equals(groupInfo.getInlongGroupMode())) { changeSource4DataSync(groupId, operateType, operator); } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java index 79eb6638cc..f566c29728 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java @@ -115,7 +115,7 @@ public class QueueResourceListener implements QueueOperateListener { String operator = context.getOperator(); GroupOperateType operateType = groupProcessForm.getGroupOperateType(); - if (InlongConstants.DATASYNC_MODE.equals(groupInfo.getInlongGroupMode())) { + if (InlongConstants.DATASYNC_REALTIME_MODE.equals(groupInfo.getInlongGroupMode())) { log.warn("skip to execute QueueResourceListener as sync mode for groupId={}", groupId); if (GroupOperateType.INIT.equals(operateType)) { this.createQueueForStreams(groupInfo, groupProcessForm.getStreamInfos(), operator); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/stream/InitStreamCompleteListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/stream/InitStreamCompleteListener.java index a739c5da02..df41823b84 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/stream/InitStreamCompleteListener.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/stream/InitStreamCompleteListener.java @@ -66,7 +66,7 @@ public class InitStreamCompleteListener implements ProcessEventListener { // Update status of other related configs streamService.updateStatus(groupId, streamId, StreamStatus.CONFIG_SUCCESSFUL.getCode(), operator); streamService.updateWithoutCheck(streamInfo.genRequest(), operator); - if (InlongConstants.DATASYNC_MODE.equals(form.getGroupInfo().getInlongGroupMode())) { + if (InlongConstants.DATASYNC_REALTIME_MODE.equals(form.getGroupInfo().getInlongGroupMode())) { sourceService.updateStatus(groupId, streamId, SourceStatus.SOURCE_NORMAL.getCode(), operator); } else { sourceService.updateStatus(groupId, streamId, SourceStatus.TO_BE_ISSUED_ADD.getCode(), operator); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java index 9428101c40..83b7ac3ea9 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java @@ -138,7 +138,7 @@ public abstract class AbstractSourceOperator implements StreamSourceOperator { updateFieldOpt(entity, request.getFieldList()); return; } - boolean allowUpdate = InlongConstants.DATASYNC_MODE.equals(groupMode) + boolean allowUpdate = InlongConstants.DATASYNC_REALTIME_MODE.equals(groupMode) || SourceStatus.ALLOWED_UPDATE.contains(entity.getStatus()); if (!allowUpdate) { throw new BusinessException(ErrorCodeEnum.SOURCE_OPT_NOT_ALLOWED, diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java index c252a6a00c..3dd8fa5f26 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java @@ -248,7 +248,7 @@ public class StreamSourceServiceImpl implements StreamSourceService { // if the group mode is DATASYNC, just get all related stream sources List<StreamSource> streamSources = this.listSource(groupId, null); - if (InlongConstants.DATASYNC_MODE.equals(groupInfo.getInlongGroupMode())) { + if (InlongConstants.DATASYNC_REALTIME_MODE.equals(groupInfo.getInlongGroupMode())) { result = streamSources.stream() .collect(Collectors.groupingBy(StreamSource::getInlongStreamId, HashMap::new, Collectors.toCollection(ArrayList::new)));