This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/branch-1.5 by this push:
     new 4a0175797 [INLONG-7294][Manager] Fix the problem of suspend, restart, 
delete sort task failed (#7295)
4a0175797 is described below

commit 4a0175797d27b2484bed9d4221208e12cbc0fc3e
Author: fuweng11 <76141879+fuwen...@users.noreply.github.com>
AuthorDate: Tue Jan 31 10:04:01 2023 +0800

    [INLONG-7294][Manager] Fix the problem of suspend, restart, delete sort 
task failed (#7295)
---
 .../manager/plugin/listener/DeleteSortListener.java       | 12 +++++-------
 .../manager/plugin/listener/DeleteStreamListener.java     | 13 +++++--------
 .../manager/plugin/listener/RestartSortListener.java      | 14 +++++---------
 .../manager/plugin/listener/RestartStreamListener.java    | 15 +++++----------
 .../manager/plugin/listener/SuspendSortListener.java      | 14 +++++---------
 .../manager/plugin/listener/SuspendStreamListener.java    | 15 +++++----------
 6 files changed, 30 insertions(+), 53 deletions(-)

diff --git 
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
 
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
index 7eae0e9ca..d898d5d83 100644
--- 
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
+++ 
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
@@ -89,15 +89,13 @@ public class DeleteSortListener implements 
SortOperateListener {
         Map<String, String> kvConf = new HashMap<>();
         extList.forEach(groupExtInfo -> kvConf.put(groupExtInfo.getKeyName(), 
groupExtInfo.getKeyValue()));
         String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
-        if (StringUtils.isEmpty(sortExt)) {
-            log.warn("no need to delete sort for groupId={}, as the sort 
properties is empty", groupId);
-            return ListenerResult.success();
+        if (StringUtils.isNotEmpty(sortExt)) {
+            Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue(
+                    JsonUtils.OBJECT_MAPPER.readTree(sortExt), new 
TypeReference<Map<String, String>>() {
+                    });
+            kvConf.putAll(result);
         }
 
-        Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue(
-                JsonUtils.OBJECT_MAPPER.readTree(sortExt), new 
TypeReference<Map<String, String>>() {
-                });
-        kvConf.putAll(result);
         String jobId = kvConf.get(InlongConstants.SORT_JOB_ID);
         if (StringUtils.isBlank(jobId)) {
             String message = String.format("sort job id is empty for 
groupId=%s", groupId);
diff --git 
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java
 
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java
index 64006410b..54d4f23b6 100644
--- 
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java
+++ 
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java
@@ -94,16 +94,13 @@ public class DeleteStreamListener implements 
SortOperateListener {
         final String groupId = streamInfo.getInlongGroupId();
         final String streamId = streamInfo.getInlongStreamId();
         String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
-        if (StringUtils.isEmpty(sortExt)) {
-            log.warn("no need to delete sort for groupId={} streamId={}, as 
the sort properties is empty",
-                    groupId, streamId);
-            return ListenerResult.success();
+        if (StringUtils.isNotEmpty(sortExt)) {
+            Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue(
+                    JsonUtils.OBJECT_MAPPER.readTree(sortExt), new 
TypeReference<Map<String, String>>() {
+                    });
+            kvConf.putAll(result);
         }
 
-        Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue(
-                JsonUtils.OBJECT_MAPPER.readTree(sortExt), new 
TypeReference<Map<String, String>>() {
-                });
-        kvConf.putAll(result);
         String jobId = kvConf.get(InlongConstants.SORT_JOB_ID);
         if (StringUtils.isBlank(jobId)) {
             String message = String.format("sort job id is empty for 
groupId=%s streamId=%s", groupId, streamId);
diff --git 
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
 
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
index 0f1fbe4ec..57238a33c 100644
--- 
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
+++ 
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
@@ -90,17 +90,13 @@ public class RestartSortListener implements 
SortOperateListener {
         Map<String, String> kvConf = new HashMap<>();
         extList.forEach(groupExtInfo -> kvConf.put(groupExtInfo.getKeyName(), 
groupExtInfo.getKeyValue()));
         String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
-        if (StringUtils.isEmpty(sortExt)) {
-            String message = String.format("restart sort failed for groupId 
[%s], as the sort properties is empty",
-                    groupId);
-            log.error(message);
-            return ListenerResult.fail(message);
+        if (StringUtils.isNotEmpty(sortExt)) {
+            Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue(
+                    JsonUtils.OBJECT_MAPPER.readTree(sortExt), new 
TypeReference<Map<String, String>>() {
+                    });
+            kvConf.putAll(result);
         }
 
-        Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue(
-                JsonUtils.OBJECT_MAPPER.readTree(sortExt), new 
TypeReference<Map<String, String>>() {
-                });
-        kvConf.putAll(result);
         String jobId = kvConf.get(InlongConstants.SORT_JOB_ID);
         if (StringUtils.isBlank(jobId)) {
             String message = String.format("sort job id is empty for groupId 
[%s]", groupId);
diff --git 
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java
 
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java
index 5ca86cc53..322854476 100644
--- 
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java
+++ 
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java
@@ -99,18 +99,13 @@ public class RestartStreamListener implements 
SortOperateListener {
         final String groupId = streamInfo.getInlongGroupId();
         final String streamId = streamInfo.getInlongStreamId();
         String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
-        if (StringUtils.isEmpty(sortExt)) {
-            String message = String.format(
-                    "restart sort failed for groupId [%s] and streamId [%s], 
as the sort properties is empty",
-                    groupId, streamId);
-            log.error(message);
-            return ListenerResult.fail(message);
+        if (StringUtils.isNotEmpty(sortExt)) {
+            Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue(
+                    JsonUtils.OBJECT_MAPPER.readTree(sortExt), new 
TypeReference<Map<String, String>>() {
+                    });
+            kvConf.putAll(result);
         }
 
-        Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue(
-                JsonUtils.OBJECT_MAPPER.readTree(sortExt), new 
TypeReference<Map<String, String>>() {
-                });
-        kvConf.putAll(result);
         String jobId = kvConf.get(InlongConstants.SORT_JOB_ID);
         if (StringUtils.isBlank(jobId)) {
             String message = String.format("sort job id is empty for groupId 
[%s] streamId [%s]", groupId, streamId);
diff --git 
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java
 
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java
index d8a17d2d5..fd68e364c 100644
--- 
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java
+++ 
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java
@@ -89,17 +89,13 @@ public class SuspendSortListener implements 
SortOperateListener {
         Map<String, String> kvConf = new HashMap<>();
         extList.forEach(groupExtInfo -> kvConf.put(groupExtInfo.getKeyName(), 
groupExtInfo.getKeyValue()));
         String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
-        if (StringUtils.isEmpty(sortExt)) {
-            String message = String.format("suspend sort failed for groupId 
[%s], as the sort properties is empty",
-                    groupId);
-            log.error(message);
-            return ListenerResult.fail(message);
+        if (StringUtils.isNotEmpty(sortExt)) {
+            Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue(
+                    JsonUtils.OBJECT_MAPPER.readTree(sortExt), new 
TypeReference<Map<String, String>>() {
+                    });
+            kvConf.putAll(result);
         }
 
-        Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue(
-                JsonUtils.OBJECT_MAPPER.readTree(sortExt), new 
TypeReference<Map<String, String>>() {
-                });
-        kvConf.putAll(result);
         String jobId = kvConf.get(InlongConstants.SORT_JOB_ID);
         if (StringUtils.isBlank(jobId)) {
             String message = String.format("sort job id is empty for groupId 
[%s]", groupId);
diff --git 
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendStreamListener.java
 
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendStreamListener.java
index 672590d67..b3f7821c1 100644
--- 
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendStreamListener.java
+++ 
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendStreamListener.java
@@ -95,18 +95,13 @@ public class SuspendStreamListener implements 
SortOperateListener {
         final String groupId = streamInfo.getInlongGroupId();
         final String streamId = streamInfo.getInlongStreamId();
         String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
-        if (StringUtils.isEmpty(sortExt)) {
-            String message = String.format(
-                    "suspend sort failed for groupId [%s] streamId [%s], as 
the sort properties is empty",
-                    groupId, streamId);
-            log.error(message);
-            return ListenerResult.fail(message);
+        if (StringUtils.isNotEmpty(sortExt)) {
+            Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue(
+                    JsonUtils.OBJECT_MAPPER.readTree(sortExt), new 
TypeReference<Map<String, String>>() {
+                    });
+            kvConf.putAll(result);
         }
 
-        Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue(
-                JsonUtils.OBJECT_MAPPER.readTree(sortExt), new 
TypeReference<Map<String, String>>() {
-                });
-        kvConf.putAll(result);
         String jobId = kvConf.get(InlongConstants.SORT_JOB_ID);
         if (StringUtils.isBlank(jobId)) {
             String message = String.format("sort job id is empty for groupId 
[%s] streamId [%s]", groupId, streamId);

Reply via email to