This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 58b2fc963c [INLONG-9818][Manager] Decode Msg based on the manager's configuration (#9819) 58b2fc963c is described below commit 58b2fc963cad0715ab0d4a82de50a9c95fd37159 Author: fuweng11 <76141879+fuwen...@users.noreply.github.com> AuthorDate: Thu Mar 14 16:45:53 2024 +0800 [INLONG-9818][Manager] Decode Msg based on the manager's configuration (#9819) --- .../manager/service/resource/queue/pulsar/PulsarOperator.java | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java index 99653c05c1..ccabb716f1 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java @@ -453,10 +453,12 @@ public class PulsarOperator { messagePosition); PulsarMessageInfo messageInfo = PulsarUtils.getMessageFromHttpResponse(httpResponse, topicPartition); Map<String, String> headers = messageInfo.getProperties(); - int wrapTypeId = Integer.parseInt(headers.getOrDefault(InlongConstants.MSG_ENCODE_VER, - Integer.toString(MessageWrapType.INLONG_MSG_V0.getId()))); - DeserializeOperator deserializeOperator = deserializeOperatorFactory.getInstance( - MessageWrapType.valueOf(wrapTypeId)); + MessageWrapType messageWrapType = MessageWrapType.forType(streamInfo.getWrapType()); + if (headers.get(InlongConstants.MSG_ENCODE_VER) != null) { + messageWrapType = + MessageWrapType.valueOf(Integer.parseInt(headers.get(InlongConstants.MSG_ENCODE_VER))); + } + DeserializeOperator deserializeOperator = deserializeOperatorFactory.getInstance(messageWrapType); briefMQMessages.addAll(deserializeOperator.decodeMsg(streamInfo, messageInfo.getBody(), headers, index)); } catch (Exception e) {