This is an automated email from the ASF dual-hosted git repository.

aloyszhang pushed a commit to branch dev-offline-sync
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 5cdd94d4c9d927342b45570c86523948f115d3b2
Author: AloysZhang <aloyszh...@apache.org>
AuthorDate: Wed Mar 13 16:18:04 2024 +0800

    [INLONG-9813][Manager] Support offline data sync management (#9814)
---
 .../inlong/manager/plugin/listener/StartupSortListener.java | 13 +++++++++----
 .../inlong/manager/service/core/impl/AuditServiceImpl.java  |  6 ++++--
 .../service/listener/group/InitGroupCompleteListener.java   |  3 ++-
 .../service/listener/group/UpdateGroupCompleteListener.java |  3 ++-
 .../listener/group/apply/ApproveApplyProcessListener.java   |  1 -
 .../service/listener/queue/QueueResourceListener.java       |  6 ++++--
 .../service/listener/stream/InitStreamCompleteListener.java |  3 ++-
 .../manager/service/source/AbstractSourceOperator.java      |  1 +
 .../manager/service/source/StreamSourceServiceImpl.java     |  3 ++-
 9 files changed, 26 insertions(+), 13 deletions(-)

diff --git 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
index ab9e9b55d8..fae5faa278 100644
--- 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
+++ 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
@@ -73,7 +73,8 @@ public class StartupSortListener implements 
SortOperateListener {
         }
 
         log.info("add startup group listener for groupId [{}]", groupId);
-        return 
InlongConstants.DATASYNC_REALTIME_MODE.equals(groupProcessForm.getGroupInfo().getInlongGroupMode());
+        return 
(InlongConstants.DATASYNC_REALTIME_MODE.equals(groupProcessForm.getGroupInfo().getInlongGroupMode())
+                || 
InlongConstants.DATASYNC_OFFLINE_MODE.equals(groupProcessForm.getGroupInfo().getInlongGroupMode()));
     }
 
     @Override
@@ -141,9 +142,13 @@ public class StartupSortListener implements 
SortOperateListener {
             FlinkOperation flinkOperation = FlinkOperation.getInstance();
             try {
                 flinkOperation.genPath(flinkInfo, dataflow);
-                flinkOperation.start(flinkInfo);
-                log.info("job submit success for groupId = {}, streamId = {}, 
jobId = {}", groupId,
-                        streamInfo.getInlongStreamId(), flinkInfo.getJobId());
+                // only start job for real-time mode
+                if (InlongConstants.DATASYNC_REALTIME_MODE
+                        
.equals(groupResourceForm.getGroupInfo().getInlongGroupMode())) {
+                    flinkOperation.start(flinkInfo);
+                    log.info("job submit success for groupId = {}, streamId = 
{}, jobId = {}", groupId,
+                            streamInfo.getInlongStreamId(), 
flinkInfo.getJobId());
+                }
             } catch (Exception e) {
                 flinkInfo.setException(true);
                 flinkInfo.setExceptionMsg(getExceptionStackMsg(e));
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
index c0086ada23..217f3b5d25 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
@@ -297,7 +297,8 @@ public class AuditServiceImpl implements AuditService {
 
             if (CollectionUtils.isEmpty(request.getAuditIds())) {
                 // properly overwrite audit ids by role and stream config
-                if 
(InlongConstants.DATASYNC_REALTIME_MODE.equals(groupEntity.getInlongGroupMode()))
 {
+                if 
(InlongConstants.DATASYNC_REALTIME_MODE.equals(groupEntity.getInlongGroupMode())
+                        || 
InlongConstants.DATASYNC_OFFLINE_MODE.equals(groupEntity.getInlongGroupMode())) 
{
                     auditIdMap.put(getAuditId(sourceNodeType, false), 
sourceNodeType);
                     request.setAuditIds(getAuditIds(groupId, streamId, 
sourceNodeType, sinkNodeType));
                 } else {
@@ -462,7 +463,8 @@ public class AuditServiceImpl implements AuditService {
         } else {
             auditSet.add(getAuditId(sinkNodeType, true));
             InlongGroupEntity inlongGroup = 
inlongGroupMapper.selectByGroupId(groupId);
-            if 
(InlongConstants.DATASYNC_REALTIME_MODE.equals(inlongGroup.getInlongGroupMode()))
 {
+            if 
(InlongConstants.DATASYNC_REALTIME_MODE.equals(inlongGroup.getInlongGroupMode())
+                    || 
InlongConstants.DATASYNC_OFFLINE_MODE.equals(inlongGroup.getInlongGroupMode())) 
{
                 auditSet.add(getAuditId(sourceNodeType, false));
             } else {
                 auditSet.add(getAuditId(sinkNodeType, false));
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/InitGroupCompleteListener.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/InitGroupCompleteListener.java
index c96c12c9ff..c8c087320f 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/InitGroupCompleteListener.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/InitGroupCompleteListener.java
@@ -98,7 +98,8 @@ public class InitGroupCompleteListener implements 
ProcessEventListener {
 
             // update status of other related configs
             if 
(InlongConstants.DISABLE_CREATE_RESOURCE.equals(groupInfo.getEnableCreateResource()))
 {
-                if 
(InlongConstants.DATASYNC_REALTIME_MODE.equals(groupInfo.getInlongGroupMode())) 
{
+                if 
(InlongConstants.DATASYNC_REALTIME_MODE.equals(groupInfo.getInlongGroupMode())
+                        || 
InlongConstants.DATASYNC_OFFLINE_MODE.equals(groupInfo.getInlongGroupMode())) {
                     sourceService.updateStatus(groupId, null, 
SourceStatus.SOURCE_NORMAL.getCode(), operator);
                 } else {
                     sourceService.updateStatus(groupId, null, 
SourceStatus.TO_BE_ISSUED_ADD.getCode(), operator);
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/UpdateGroupCompleteListener.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/UpdateGroupCompleteListener.java
index 55290b948f..79462f78db 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/UpdateGroupCompleteListener.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/UpdateGroupCompleteListener.java
@@ -82,7 +82,8 @@ public class UpdateGroupCompleteListener implements 
ProcessEventListener {
         }
 
         // if the inlong group is dataSync mode, the stream source needs to be 
processed.
-        if 
(InlongConstants.DATASYNC_REALTIME_MODE.equals(groupInfo.getInlongGroupMode())) 
{
+        if 
(InlongConstants.DATASYNC_REALTIME_MODE.equals(groupInfo.getInlongGroupMode())
+                || 
InlongConstants.DATASYNC_OFFLINE_MODE.equals(groupInfo.getInlongGroupMode())) {
             changeSource4DataSync(groupId, operateType, operator);
         }
 
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/apply/ApproveApplyProcessListener.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/apply/ApproveApplyProcessListener.java
index 4d285488f6..33ed7f86fd 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/apply/ApproveApplyProcessListener.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/apply/ApproveApplyProcessListener.java
@@ -69,7 +69,6 @@ public class ApproveApplyProcessListener implements 
ProcessEventListener {
         InlongGroupInfo groupInfo = groupService.get(groupId);
         GroupResourceProcessForm processForm = new GroupResourceProcessForm();
         processForm.setGroupInfo(groupInfo);
-        String username = context.getOperator();
         List<InlongStreamInfo> streamList = streamService.list(groupId);
         processForm.setStreamInfos(streamList);
 
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java
index f566c29728..0ce551d1e6 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java
@@ -115,8 +115,10 @@ public class QueueResourceListener implements 
QueueOperateListener {
 
         String operator = context.getOperator();
         GroupOperateType operateType = groupProcessForm.getGroupOperateType();
-        if 
(InlongConstants.DATASYNC_REALTIME_MODE.equals(groupInfo.getInlongGroupMode())) 
{
-            log.warn("skip to execute QueueResourceListener as sync mode for 
groupId={}", groupId);
+        if 
(InlongConstants.DATASYNC_REALTIME_MODE.equals(groupInfo.getInlongGroupMode())
+                || 
InlongConstants.DATASYNC_OFFLINE_MODE.equals(groupInfo.getInlongGroupMode())) {
+            log.warn("skip to execute QueueResourceListener as sync mode {} (1 
for realtime sync, 2 for offline sync) "
+                    + "for groupId={}", groupInfo.getInlongGroupMode(), 
groupId);
             if (GroupOperateType.INIT.equals(operateType)) {
                 this.createQueueForStreams(groupInfo, 
groupProcessForm.getStreamInfos(), operator);
             }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/stream/InitStreamCompleteListener.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/stream/InitStreamCompleteListener.java
index df41823b84..85033b44f7 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/stream/InitStreamCompleteListener.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/stream/InitStreamCompleteListener.java
@@ -66,7 +66,8 @@ public class InitStreamCompleteListener implements 
ProcessEventListener {
             // Update status of other related configs
             streamService.updateStatus(groupId, streamId, 
StreamStatus.CONFIG_SUCCESSFUL.getCode(), operator);
             streamService.updateWithoutCheck(streamInfo.genRequest(), 
operator);
-            if 
(InlongConstants.DATASYNC_REALTIME_MODE.equals(form.getGroupInfo().getInlongGroupMode()))
 {
+            if 
(InlongConstants.DATASYNC_REALTIME_MODE.equals(form.getGroupInfo().getInlongGroupMode())
+                    || 
InlongConstants.DATASYNC_OFFLINE_MODE.equals(form.getGroupInfo().getInlongGroupMode()))
 {
                 sourceService.updateStatus(groupId, streamId, 
SourceStatus.SOURCE_NORMAL.getCode(), operator);
             } else {
                 sourceService.updateStatus(groupId, streamId, 
SourceStatus.TO_BE_ISSUED_ADD.getCode(), operator);
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
index 0fd1800ea4..0aacec11fb 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
@@ -138,6 +138,7 @@ public abstract class AbstractSourceOperator implements 
StreamSourceOperator {
             return;
         }
         boolean allowUpdate = 
InlongConstants.DATASYNC_REALTIME_MODE.equals(groupMode)
+                || InlongConstants.DATASYNC_OFFLINE_MODE.equals(groupMode)
                 || SourceStatus.ALLOWED_UPDATE.contains(entity.getStatus());
         if (!allowUpdate) {
             throw new BusinessException(ErrorCodeEnum.SOURCE_OPT_NOT_ALLOWED,
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
index 08015a4086..c3be04be85 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
@@ -222,7 +222,8 @@ public class StreamSourceServiceImpl implements 
StreamSourceService {
 
         // if the group mode is DATASYNC, just get all related stream sources
         List<StreamSource> streamSources = this.listSource(groupId, null);
-        if 
(InlongConstants.DATASYNC_REALTIME_MODE.equals(groupInfo.getInlongGroupMode())) 
{
+        if 
(InlongConstants.DATASYNC_REALTIME_MODE.equals(groupInfo.getInlongGroupMode())
+                || 
InlongConstants.DATASYNC_OFFLINE_MODE.equals(groupInfo.getInlongGroupMode())) {
             result = streamSources.stream()
                     
.collect(Collectors.groupingBy(StreamSource::getInlongStreamId, HashMap::new,
                             Collectors.toCollection(ArrayList::new)));

Reply via email to