This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch branch-1.5 in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/branch-1.5 by this push: new 4a0175797 [INLONG-7294][Manager] Fix the problem of suspend, restart, delete sort task failed (#7295) 4a0175797 is described below commit 4a0175797d27b2484bed9d4221208e12cbc0fc3e Author: fuweng11 <76141879+fuwen...@users.noreply.github.com> AuthorDate: Tue Jan 31 10:04:01 2023 +0800 [INLONG-7294][Manager] Fix the problem of suspend, restart, delete sort task failed (#7295) --- .../manager/plugin/listener/DeleteSortListener.java | 12 +++++------- .../manager/plugin/listener/DeleteStreamListener.java | 13 +++++-------- .../manager/plugin/listener/RestartSortListener.java | 14 +++++--------- .../manager/plugin/listener/RestartStreamListener.java | 15 +++++---------- .../manager/plugin/listener/SuspendSortListener.java | 14 +++++--------- .../manager/plugin/listener/SuspendStreamListener.java | 15 +++++---------- 6 files changed, 30 insertions(+), 53 deletions(-) diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java index 7eae0e9ca..d898d5d83 100644 --- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java +++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java @@ -89,15 +89,13 @@ public class DeleteSortListener implements SortOperateListener { Map<String, String> kvConf = new HashMap<>(); extList.forEach(groupExtInfo -> kvConf.put(groupExtInfo.getKeyName(), groupExtInfo.getKeyValue())); String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES); - if (StringUtils.isEmpty(sortExt)) { - log.warn("no need to delete sort for groupId={}, as the sort properties is empty", groupId); - return ListenerResult.success(); + if (StringUtils.isNotEmpty(sortExt)) { + Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue( + JsonUtils.OBJECT_MAPPER.readTree(sortExt), new TypeReference<Map<String, String>>() { + }); + kvConf.putAll(result); } - Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue( - JsonUtils.OBJECT_MAPPER.readTree(sortExt), new TypeReference<Map<String, String>>() { - }); - kvConf.putAll(result); String jobId = kvConf.get(InlongConstants.SORT_JOB_ID); if (StringUtils.isBlank(jobId)) { String message = String.format("sort job id is empty for groupId=%s", groupId); diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java index 64006410b..54d4f23b6 100644 --- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java +++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java @@ -94,16 +94,13 @@ public class DeleteStreamListener implements SortOperateListener { final String groupId = streamInfo.getInlongGroupId(); final String streamId = streamInfo.getInlongStreamId(); String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES); - if (StringUtils.isEmpty(sortExt)) { - log.warn("no need to delete sort for groupId={} streamId={}, as the sort properties is empty", - groupId, streamId); - return ListenerResult.success(); + if (StringUtils.isNotEmpty(sortExt)) { + Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue( + JsonUtils.OBJECT_MAPPER.readTree(sortExt), new TypeReference<Map<String, String>>() { + }); + kvConf.putAll(result); } - Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue( - JsonUtils.OBJECT_MAPPER.readTree(sortExt), new TypeReference<Map<String, String>>() { - }); - kvConf.putAll(result); String jobId = kvConf.get(InlongConstants.SORT_JOB_ID); if (StringUtils.isBlank(jobId)) { String message = String.format("sort job id is empty for groupId=%s streamId=%s", groupId, streamId); diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java index 0f1fbe4ec..57238a33c 100644 --- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java +++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java @@ -90,17 +90,13 @@ public class RestartSortListener implements SortOperateListener { Map<String, String> kvConf = new HashMap<>(); extList.forEach(groupExtInfo -> kvConf.put(groupExtInfo.getKeyName(), groupExtInfo.getKeyValue())); String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES); - if (StringUtils.isEmpty(sortExt)) { - String message = String.format("restart sort failed for groupId [%s], as the sort properties is empty", - groupId); - log.error(message); - return ListenerResult.fail(message); + if (StringUtils.isNotEmpty(sortExt)) { + Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue( + JsonUtils.OBJECT_MAPPER.readTree(sortExt), new TypeReference<Map<String, String>>() { + }); + kvConf.putAll(result); } - Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue( - JsonUtils.OBJECT_MAPPER.readTree(sortExt), new TypeReference<Map<String, String>>() { - }); - kvConf.putAll(result); String jobId = kvConf.get(InlongConstants.SORT_JOB_ID); if (StringUtils.isBlank(jobId)) { String message = String.format("sort job id is empty for groupId [%s]", groupId); diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java index 5ca86cc53..322854476 100644 --- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java +++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java @@ -99,18 +99,13 @@ public class RestartStreamListener implements SortOperateListener { final String groupId = streamInfo.getInlongGroupId(); final String streamId = streamInfo.getInlongStreamId(); String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES); - if (StringUtils.isEmpty(sortExt)) { - String message = String.format( - "restart sort failed for groupId [%s] and streamId [%s], as the sort properties is empty", - groupId, streamId); - log.error(message); - return ListenerResult.fail(message); + if (StringUtils.isNotEmpty(sortExt)) { + Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue( + JsonUtils.OBJECT_MAPPER.readTree(sortExt), new TypeReference<Map<String, String>>() { + }); + kvConf.putAll(result); } - Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue( - JsonUtils.OBJECT_MAPPER.readTree(sortExt), new TypeReference<Map<String, String>>() { - }); - kvConf.putAll(result); String jobId = kvConf.get(InlongConstants.SORT_JOB_ID); if (StringUtils.isBlank(jobId)) { String message = String.format("sort job id is empty for groupId [%s] streamId [%s]", groupId, streamId); diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java index d8a17d2d5..fd68e364c 100644 --- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java +++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java @@ -89,17 +89,13 @@ public class SuspendSortListener implements SortOperateListener { Map<String, String> kvConf = new HashMap<>(); extList.forEach(groupExtInfo -> kvConf.put(groupExtInfo.getKeyName(), groupExtInfo.getKeyValue())); String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES); - if (StringUtils.isEmpty(sortExt)) { - String message = String.format("suspend sort failed for groupId [%s], as the sort properties is empty", - groupId); - log.error(message); - return ListenerResult.fail(message); + if (StringUtils.isNotEmpty(sortExt)) { + Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue( + JsonUtils.OBJECT_MAPPER.readTree(sortExt), new TypeReference<Map<String, String>>() { + }); + kvConf.putAll(result); } - Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue( - JsonUtils.OBJECT_MAPPER.readTree(sortExt), new TypeReference<Map<String, String>>() { - }); - kvConf.putAll(result); String jobId = kvConf.get(InlongConstants.SORT_JOB_ID); if (StringUtils.isBlank(jobId)) { String message = String.format("sort job id is empty for groupId [%s]", groupId); diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendStreamListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendStreamListener.java index 672590d67..b3f7821c1 100644 --- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendStreamListener.java +++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendStreamListener.java @@ -95,18 +95,13 @@ public class SuspendStreamListener implements SortOperateListener { final String groupId = streamInfo.getInlongGroupId(); final String streamId = streamInfo.getInlongStreamId(); String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES); - if (StringUtils.isEmpty(sortExt)) { - String message = String.format( - "suspend sort failed for groupId [%s] streamId [%s], as the sort properties is empty", - groupId, streamId); - log.error(message); - return ListenerResult.fail(message); + if (StringUtils.isNotEmpty(sortExt)) { + Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue( + JsonUtils.OBJECT_MAPPER.readTree(sortExt), new TypeReference<Map<String, String>>() { + }); + kvConf.putAll(result); } - Map<String, String> result = JsonUtils.OBJECT_MAPPER.convertValue( - JsonUtils.OBJECT_MAPPER.readTree(sortExt), new TypeReference<Map<String, String>>() { - }); - kvConf.putAll(result); String jobId = kvConf.get(InlongConstants.SORT_JOB_ID); if (StringUtils.isBlank(jobId)) { String message = String.format("sort job id is empty for groupId [%s] streamId [%s]", groupId, streamId);