This is an automated email from the ASF dual-hosted git repository.

aloyszhang pushed a commit to branch dev-offline-sync
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 2674a5645c1f0f0e821ec26efb1351c4297dd056
Author: AloysZhang <aloyszh...@apache.org>
AuthorDate: Wed Mar 13 16:18:04 2024 +0800

    [INLONG-9813][Manager] Support offline data sync management (#9814)
---
 .../apache/inlong/manager/plugin/listener/StartupSortListener.java  | 3 ++-
 .../manager/service/listener/group/InitGroupCompleteListener.java   | 3 ++-
 .../manager/service/listener/group/UpdateGroupCompleteListener.java | 3 ++-
 .../manager/service/listener/queue/QueueResourceListener.java       | 6 ++++--
 .../manager/service/listener/stream/InitStreamCompleteListener.java | 3 ++-
 .../inlong/manager/service/source/AbstractSourceOperator.java       | 1 +
 .../inlong/manager/service/source/StreamSourceServiceImpl.java      | 3 ++-
 7 files changed, 15 insertions(+), 7 deletions(-)

diff --git 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
index 9997c2abd2..a84d3f5868 100644
--- 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
+++ 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
@@ -61,7 +61,8 @@ public class StartupSortListener implements 
SortOperateListener {
         }
 
         log.info("add startup group listener for groupId [{}]", groupId);
-        return 
InlongConstants.DATASYNC_REALTIME_MODE.equals(groupProcessForm.getGroupInfo().getInlongGroupMode());
+        return 
(InlongConstants.DATASYNC_REALTIME_MODE.equals(groupProcessForm.getGroupInfo().getInlongGroupMode())
+                || 
InlongConstants.DATASYNC_OFFLINE_MODE.equals(groupProcessForm.getGroupInfo().getInlongGroupMode()));
     }
 
     @Override
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/InitGroupCompleteListener.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/InitGroupCompleteListener.java
index c96c12c9ff..c8c087320f 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/InitGroupCompleteListener.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/InitGroupCompleteListener.java
@@ -98,7 +98,8 @@ public class InitGroupCompleteListener implements 
ProcessEventListener {
 
             // update status of other related configs
             if 
(InlongConstants.DISABLE_CREATE_RESOURCE.equals(groupInfo.getEnableCreateResource()))
 {
-                if 
(InlongConstants.DATASYNC_REALTIME_MODE.equals(groupInfo.getInlongGroupMode())) 
{
+                if 
(InlongConstants.DATASYNC_REALTIME_MODE.equals(groupInfo.getInlongGroupMode())
+                        || 
InlongConstants.DATASYNC_OFFLINE_MODE.equals(groupInfo.getInlongGroupMode())) {
                     sourceService.updateStatus(groupId, null, 
SourceStatus.SOURCE_NORMAL.getCode(), operator);
                 } else {
                     sourceService.updateStatus(groupId, null, 
SourceStatus.TO_BE_ISSUED_ADD.getCode(), operator);
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/UpdateGroupCompleteListener.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/UpdateGroupCompleteListener.java
index a6dd8fc786..1a6e1f7fe1 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/UpdateGroupCompleteListener.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/UpdateGroupCompleteListener.java
@@ -96,7 +96,8 @@ public class UpdateGroupCompleteListener implements 
ProcessEventListener {
         }
 
         // if the inlong group is dataSync mode, the stream source needs to be 
processed.
-        if 
(InlongConstants.DATASYNC_REALTIME_MODE.equals(groupInfo.getInlongGroupMode())) 
{
+        if 
(InlongConstants.DATASYNC_REALTIME_MODE.equals(groupInfo.getInlongGroupMode())
+                || 
InlongConstants.DATASYNC_OFFLINE_MODE.equals(groupInfo.getInlongGroupMode())) {
             changeSource4DataSync(groupId, operateType, operator);
         }
 
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java
index f566c29728..0ce551d1e6 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java
@@ -115,8 +115,10 @@ public class QueueResourceListener implements 
QueueOperateListener {
 
         String operator = context.getOperator();
         GroupOperateType operateType = groupProcessForm.getGroupOperateType();
-        if 
(InlongConstants.DATASYNC_REALTIME_MODE.equals(groupInfo.getInlongGroupMode())) 
{
-            log.warn("skip to execute QueueResourceListener as sync mode for 
groupId={}", groupId);
+        if 
(InlongConstants.DATASYNC_REALTIME_MODE.equals(groupInfo.getInlongGroupMode())
+                || 
InlongConstants.DATASYNC_OFFLINE_MODE.equals(groupInfo.getInlongGroupMode())) {
+            log.warn("skip to execute QueueResourceListener as sync mode {} (1 
for realtime sync, 2 for offline sync) "
+                    + "for groupId={}", groupInfo.getInlongGroupMode(), 
groupId);
             if (GroupOperateType.INIT.equals(operateType)) {
                 this.createQueueForStreams(groupInfo, 
groupProcessForm.getStreamInfos(), operator);
             }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/stream/InitStreamCompleteListener.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/stream/InitStreamCompleteListener.java
index df41823b84..85033b44f7 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/stream/InitStreamCompleteListener.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/stream/InitStreamCompleteListener.java
@@ -66,7 +66,8 @@ public class InitStreamCompleteListener implements 
ProcessEventListener {
             // Update status of other related configs
             streamService.updateStatus(groupId, streamId, 
StreamStatus.CONFIG_SUCCESSFUL.getCode(), operator);
             streamService.updateWithoutCheck(streamInfo.genRequest(), 
operator);
-            if 
(InlongConstants.DATASYNC_REALTIME_MODE.equals(form.getGroupInfo().getInlongGroupMode()))
 {
+            if 
(InlongConstants.DATASYNC_REALTIME_MODE.equals(form.getGroupInfo().getInlongGroupMode())
+                    || 
InlongConstants.DATASYNC_OFFLINE_MODE.equals(form.getGroupInfo().getInlongGroupMode()))
 {
                 sourceService.updateStatus(groupId, streamId, 
SourceStatus.SOURCE_NORMAL.getCode(), operator);
             } else {
                 sourceService.updateStatus(groupId, streamId, 
SourceStatus.TO_BE_ISSUED_ADD.getCode(), operator);
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
index 83b7ac3ea9..39d3845209 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
@@ -139,6 +139,7 @@ public abstract class AbstractSourceOperator implements 
StreamSourceOperator {
             return;
         }
         boolean allowUpdate = 
InlongConstants.DATASYNC_REALTIME_MODE.equals(groupMode)
+                || InlongConstants.DATASYNC_OFFLINE_MODE.equals(groupMode)
                 || SourceStatus.ALLOWED_UPDATE.contains(entity.getStatus());
         if (!allowUpdate) {
             throw new BusinessException(ErrorCodeEnum.SOURCE_OPT_NOT_ALLOWED,
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
index a610fb6214..34cc62a250 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
@@ -248,7 +248,8 @@ public class StreamSourceServiceImpl implements 
StreamSourceService {
 
         // if the group mode is DATASYNC, just get all related stream sources
         List<StreamSource> streamSources = this.listSource(groupId, null);
-        if 
(InlongConstants.DATASYNC_REALTIME_MODE.equals(groupInfo.getInlongGroupMode())) 
{
+        if 
(InlongConstants.DATASYNC_REALTIME_MODE.equals(groupInfo.getInlongGroupMode())
+                || 
InlongConstants.DATASYNC_OFFLINE_MODE.equals(groupInfo.getInlongGroupMode())) {
             result = streamSources.stream()
                     
.collect(Collectors.groupingBy(StreamSource::getInlongStreamId, HashMap::new,
                             Collectors.toCollection(ArrayList::new)));

Reply via email to