This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 58b2fc963c [INLONG-9818][Manager] Decode Msg based on the manager's 
configuration (#9819)
58b2fc963c is described below

commit 58b2fc963cad0715ab0d4a82de50a9c95fd37159
Author: fuweng11 <76141879+fuwen...@users.noreply.github.com>
AuthorDate: Thu Mar 14 16:45:53 2024 +0800

    [INLONG-9818][Manager] Decode Msg based on the manager's configuration 
(#9819)
---
 .../manager/service/resource/queue/pulsar/PulsarOperator.java  | 10 ++++++----
 1 file changed, 6 insertions(+), 4 deletions(-)

diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
index 99653c05c1..ccabb716f1 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
@@ -453,10 +453,12 @@ public class PulsarOperator {
                             messagePosition);
             PulsarMessageInfo messageInfo = 
PulsarUtils.getMessageFromHttpResponse(httpResponse, topicPartition);
             Map<String, String> headers = messageInfo.getProperties();
-            int wrapTypeId = 
Integer.parseInt(headers.getOrDefault(InlongConstants.MSG_ENCODE_VER,
-                    Integer.toString(MessageWrapType.INLONG_MSG_V0.getId())));
-            DeserializeOperator deserializeOperator = 
deserializeOperatorFactory.getInstance(
-                    MessageWrapType.valueOf(wrapTypeId));
+            MessageWrapType messageWrapType = 
MessageWrapType.forType(streamInfo.getWrapType());
+            if (headers.get(InlongConstants.MSG_ENCODE_VER) != null) {
+                messageWrapType =
+                        
MessageWrapType.valueOf(Integer.parseInt(headers.get(InlongConstants.MSG_ENCODE_VER)));
+            }
+            DeserializeOperator deserializeOperator = 
deserializeOperatorFactory.getInstance(messageWrapType);
             briefMQMessages.addAll(deserializeOperator.decodeMsg(streamInfo, 
messageInfo.getBody(),
                     headers, index));
         } catch (Exception e) {

Reply via email to