This is an automated email from the ASF dual-hosted git repository.

aloyszhang pushed a commit to branch dev-offline-sync
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 875e07cd74e83ee2e0011eb84f6b81c53578601e
Author: AloysZhang <aloyszh...@apache.org>
AuthorDate: Thu Mar 7 11:44:20 2024 +0800

    [INLONG-9781][Manager] Add offline sync task type definition (#9787)
---
 .../java/org/apache/inlong/manager/client/ut/BaseTest.java |  2 +-
 .../inlong/manager/common/consts/InlongConstants.java      |  3 ++-
 .../org/apache/inlong/manager/common/enums/GroupMode.java  | 14 +++++++++++---
 .../manager/plugin/listener/StartupSortListener.java       |  2 +-
 .../apache/inlong/manager/pojo/group/InlongGroupInfo.java  |  3 ++-
 .../inlong/manager/pojo/group/InlongGroupPageRequest.java  |  3 ++-
 .../inlong/manager/pojo/group/InlongGroupRequest.java      |  5 +++--
 .../inlong/manager/service/core/impl/AuditServiceImpl.java |  4 ++--
 .../manager/service/group/InlongGroupServiceImpl.java      |  2 +-
 .../service/listener/group/InitGroupCompleteListener.java  |  2 +-
 .../listener/group/UpdateGroupCompleteListener.java        |  2 +-
 .../service/listener/queue/QueueResourceListener.java      |  2 +-
 .../listener/stream/InitStreamCompleteListener.java        |  2 +-
 .../manager/service/source/AbstractSourceOperator.java     |  2 +-
 .../manager/service/source/StreamSourceServiceImpl.java    |  2 +-
 15 files changed, 31 insertions(+), 19 deletions(-)

diff --git 
a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/ut/BaseTest.java
 
b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/ut/BaseTest.java
index 4d218918f5..9961c16ef8 100644
--- 
a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/ut/BaseTest.java
+++ 
b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/ut/BaseTest.java
@@ -110,7 +110,7 @@ public class BaseTest {
         // set enable zk, create resource, group mode, and cluster tag
         pulsarInfo.setEnableZookeeper(InlongConstants.DISABLE_ZK);
         
pulsarInfo.setEnableCreateResource(InlongConstants.ENABLE_CREATE_RESOURCE);
-        pulsarInfo.setInlongGroupMode(InlongConstants.DATASYNC_MODE);
+        pulsarInfo.setInlongGroupMode(InlongConstants.DATASYNC_REALTIME_MODE);
         pulsarInfo.setInlongClusterTag("default_cluster");
 
         pulsarInfo.setDailyRecords(10000000);
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
index c3085972fd..581ebb3098 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
@@ -87,7 +87,8 @@ public class InlongConstants {
     public static final Integer DELETED_STATUS = 10;
 
     public static final Integer STANDARD_MODE = 0;
-    public static final Integer DATASYNC_MODE = 1;
+    public static final Integer DATASYNC_REALTIME_MODE = 1;
+    public static final Integer DATASYNC_OFFLINE_MODE = 2;
 
     public static final Integer DISABLE_ZK = 0;
     public static final Integer ENABLE_ZK = 1;
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupMode.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupMode.java
index f0db2353b6..d4b417f39a 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupMode.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupMode.java
@@ -31,10 +31,18 @@ public enum GroupMode {
     STANDARD("standard"),
 
     /**
-     * DataSync mode(only Data Synchronization): group init only with sort in 
InLong Cluster
-     * StreamSource -> Sort -> StreamSink
+     * DataSync mode(only Data Synchronization): real-time data sync in stream 
way, group init only with
+     * sort in InLong Cluster.
+     * StreamSource -> Sort -> Sink
      */
-    DATASYNC("datasync");
+    DATASYNC("datasync"),
+
+    /**
+     * DataSync mode(only Data Synchronization): offline data sync in batch 
way, group init only with sort
+     * in InLong Cluster.
+     * BatchSource -> Sort -> Sink
+     */
+    DATASYNC_BATCH("datasync_offline");
 
     @Getter
     private final String mode;
diff --git 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
index 0b0e55e369..ab9e9b55d8 100644
--- 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
+++ 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
@@ -73,7 +73,7 @@ public class StartupSortListener implements 
SortOperateListener {
         }
 
         log.info("add startup group listener for groupId [{}]", groupId);
-        return 
InlongConstants.DATASYNC_MODE.equals(groupProcessForm.getGroupInfo().getInlongGroupMode());
+        return 
InlongConstants.DATASYNC_REALTIME_MODE.equals(groupProcessForm.getGroupInfo().getInlongGroupMode());
     }
 
     @Override
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupInfo.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupInfo.java
index 7c272d0911..272583778e 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupInfo.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupInfo.java
@@ -72,7 +72,8 @@ public abstract class InlongGroupInfo extends BaseInlongGroup 
{
     @ApiModelProperty(value = "Whether to enable create resource? 0: disable, 
1: enable")
     private Integer enableCreateResource;
 
-    @ApiModelProperty(value = "Standard mode(include Data Ingestion and 
Synchronization): 0, DataSync mode(only Data Synchronization): 1")
+    @ApiModelProperty(value = "Standard mode(include Data Ingestion and 
Synchronization): 0, DataSync mode(only Data Synchronization, real-time data 
sync in stream way): 1,"
+            + " DataSync mode(only Data Synchronization, offline data sync in 
batch way): 2")
     private Integer inlongGroupMode;
 
     @ApiModelProperty(value = "Data report type, default is 0.\n"
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupPageRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupPageRequest.java
index 5d7c24b295..4dc2140166 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupPageRequest.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupPageRequest.java
@@ -58,7 +58,8 @@ public class InlongGroupPageRequest extends PageRequest {
     @ApiModelProperty(value = "The inlong cluster tag list")
     private List<String> clusterTagList;
 
-    @ApiModelProperty(value = "Standard mode(include Data Ingestion and 
Synchronization): 0, DataSync mode(only Data Synchronization): 1")
+    @ApiModelProperty(value = "Standard mode(include Data Ingestion and 
Synchronization): 0, DataSync mode(only Data Synchronization, real-time data 
sync in stream way): 1,"
+            + " DataSync mode(only Data Synchronization, offline data sync in 
batch way): 2")
     private Integer inlongGroupMode;
 
     @ApiModelProperty(value = "Current user", hidden = true)
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupRequest.java
index 30893b7a09..6140bddad5 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupRequest.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupRequest.java
@@ -86,8 +86,9 @@ public abstract class InlongGroupRequest extends 
BaseInlongGroup {
     @Range(min = 0, max = 1, message = "default is 1, only supports [0: 
disable, 1: enable]")
     private Integer enableCreateResource;
 
-    @ApiModelProperty(value = "Standard mode(include Data Ingestion and 
Synchronization): 0, DataSync mode(only Data Synchronization): 1")
-    @Range(min = 0, max = 1, message = "default is 0, only supports [0: 
Standard, 1: DataSync]")
+    @ApiModelProperty(value = "Standard mode(include Data Ingestion and 
Synchronization): 0, DataSync mode(only Data Synchronization, real-time data 
sync in stream way): 1,"
+            + " DataSync mode(only Data Synchronization, offline data sync in 
batch way): 2")
+    @Range(min = 0, max = 2, message = "default is 0, only supports [0: 
Standard, 1: DataSync, 2: DataSyncOffline]")
     private Integer inlongGroupMode;
 
     @ApiModelProperty(value = "Data report type, default is 0.\n"
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
index 281dafe170..c0086ada23 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
@@ -297,7 +297,7 @@ public class AuditServiceImpl implements AuditService {
 
             if (CollectionUtils.isEmpty(request.getAuditIds())) {
                 // properly overwrite audit ids by role and stream config
-                if 
(InlongConstants.DATASYNC_MODE.equals(groupEntity.getInlongGroupMode())) {
+                if 
(InlongConstants.DATASYNC_REALTIME_MODE.equals(groupEntity.getInlongGroupMode()))
 {
                     auditIdMap.put(getAuditId(sourceNodeType, false), 
sourceNodeType);
                     request.setAuditIds(getAuditIds(groupId, streamId, 
sourceNodeType, sinkNodeType));
                 } else {
@@ -462,7 +462,7 @@ public class AuditServiceImpl implements AuditService {
         } else {
             auditSet.add(getAuditId(sinkNodeType, true));
             InlongGroupEntity inlongGroup = 
inlongGroupMapper.selectByGroupId(groupId);
-            if 
(InlongConstants.DATASYNC_MODE.equals(inlongGroup.getInlongGroupMode())) {
+            if 
(InlongConstants.DATASYNC_REALTIME_MODE.equals(inlongGroup.getInlongGroupMode()))
 {
                 auditSet.add(getAuditId(sourceNodeType, false));
             } else {
                 auditSet.add(getAuditId(sinkNodeType, false));
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
index 2d87b74995..6dfc2259d3 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
@@ -706,7 +706,7 @@ public class InlongGroupServiceImpl implements 
InlongGroupService {
         InlongGroupInfo groupInfo = this.get(groupId);
 
         // check if the group mode is data sync mode
-        if 
(InlongConstants.DATASYNC_MODE.equals(groupInfo.getInlongGroupMode())) {
+        if 
(InlongConstants.DATASYNC_REALTIME_MODE.equals(groupInfo.getInlongGroupMode())) 
{
             String errMSg = String.format("no need to switch sync mode group = 
{}", groupId);
             LOGGER.error(errMSg);
             throw new BusinessException(errMSg);
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/InitGroupCompleteListener.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/InitGroupCompleteListener.java
index db8cffbbe1..c96c12c9ff 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/InitGroupCompleteListener.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/InitGroupCompleteListener.java
@@ -98,7 +98,7 @@ public class InitGroupCompleteListener implements 
ProcessEventListener {
 
             // update status of other related configs
             if 
(InlongConstants.DISABLE_CREATE_RESOURCE.equals(groupInfo.getEnableCreateResource()))
 {
-                if 
(InlongConstants.DATASYNC_MODE.equals(groupInfo.getInlongGroupMode())) {
+                if 
(InlongConstants.DATASYNC_REALTIME_MODE.equals(groupInfo.getInlongGroupMode())) 
{
                     sourceService.updateStatus(groupId, null, 
SourceStatus.SOURCE_NORMAL.getCode(), operator);
                 } else {
                     sourceService.updateStatus(groupId, null, 
SourceStatus.TO_BE_ISSUED_ADD.getCode(), operator);
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/UpdateGroupCompleteListener.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/UpdateGroupCompleteListener.java
index d21ca83c92..55290b948f 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/UpdateGroupCompleteListener.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/UpdateGroupCompleteListener.java
@@ -82,7 +82,7 @@ public class UpdateGroupCompleteListener implements 
ProcessEventListener {
         }
 
         // if the inlong group is dataSync mode, the stream source needs to be 
processed.
-        if 
(InlongConstants.DATASYNC_MODE.equals(groupInfo.getInlongGroupMode())) {
+        if 
(InlongConstants.DATASYNC_REALTIME_MODE.equals(groupInfo.getInlongGroupMode())) 
{
             changeSource4DataSync(groupId, operateType, operator);
         }
 
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java
index 79eb6638cc..f566c29728 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java
@@ -115,7 +115,7 @@ public class QueueResourceListener implements 
QueueOperateListener {
 
         String operator = context.getOperator();
         GroupOperateType operateType = groupProcessForm.getGroupOperateType();
-        if 
(InlongConstants.DATASYNC_MODE.equals(groupInfo.getInlongGroupMode())) {
+        if 
(InlongConstants.DATASYNC_REALTIME_MODE.equals(groupInfo.getInlongGroupMode())) 
{
             log.warn("skip to execute QueueResourceListener as sync mode for 
groupId={}", groupId);
             if (GroupOperateType.INIT.equals(operateType)) {
                 this.createQueueForStreams(groupInfo, 
groupProcessForm.getStreamInfos(), operator);
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/stream/InitStreamCompleteListener.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/stream/InitStreamCompleteListener.java
index a739c5da02..df41823b84 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/stream/InitStreamCompleteListener.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/stream/InitStreamCompleteListener.java
@@ -66,7 +66,7 @@ public class InitStreamCompleteListener implements 
ProcessEventListener {
             // Update status of other related configs
             streamService.updateStatus(groupId, streamId, 
StreamStatus.CONFIG_SUCCESSFUL.getCode(), operator);
             streamService.updateWithoutCheck(streamInfo.genRequest(), 
operator);
-            if 
(InlongConstants.DATASYNC_MODE.equals(form.getGroupInfo().getInlongGroupMode()))
 {
+            if 
(InlongConstants.DATASYNC_REALTIME_MODE.equals(form.getGroupInfo().getInlongGroupMode()))
 {
                 sourceService.updateStatus(groupId, streamId, 
SourceStatus.SOURCE_NORMAL.getCode(), operator);
             } else {
                 sourceService.updateStatus(groupId, streamId, 
SourceStatus.TO_BE_ISSUED_ADD.getCode(), operator);
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
index 00f85052fd..0fd1800ea4 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
@@ -137,7 +137,7 @@ public abstract class AbstractSourceOperator implements 
StreamSourceOperator {
             updateFieldOpt(entity, request.getFieldList());
             return;
         }
-        boolean allowUpdate = InlongConstants.DATASYNC_MODE.equals(groupMode)
+        boolean allowUpdate = 
InlongConstants.DATASYNC_REALTIME_MODE.equals(groupMode)
                 || SourceStatus.ALLOWED_UPDATE.contains(entity.getStatus());
         if (!allowUpdate) {
             throw new BusinessException(ErrorCodeEnum.SOURCE_OPT_NOT_ALLOWED,
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
index a8a01224b8..08015a4086 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
@@ -222,7 +222,7 @@ public class StreamSourceServiceImpl implements 
StreamSourceService {
 
         // if the group mode is DATASYNC, just get all related stream sources
         List<StreamSource> streamSources = this.listSource(groupId, null);
-        if 
(InlongConstants.DATASYNC_MODE.equals(groupInfo.getInlongGroupMode())) {
+        if 
(InlongConstants.DATASYNC_REALTIME_MODE.equals(groupInfo.getInlongGroupMode())) 
{
             result = streamSources.stream()
                     
.collect(Collectors.groupingBy(StreamSource::getInlongStreamId, HashMap::new,
                             Collectors.toCollection(ArrayList::new)));

Reply via email to