This is an automated email from the ASF dual-hosted git repository.

vernedeng pushed a commit to branch branch-1.8
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 12fb5689ce3affd1e87f9f6c88bea03d4b269ac7
Author: fuweng11 <76141879+fuwen...@users.noreply.github.com>
AuthorDate: Tue Jul 18 14:07:00 2023 +0800

    [INLONG-8552][Manager] Fix the data preview obtained is not the latest 
message (#8553)
    
    (cherry picked from commit 293d9dc0db3141b206c689eddeec1ebd975989b4)
---
 .../service/resource/queue/pulsar/PulsarOperator.java     | 15 +++------------
 1 file changed, 3 insertions(+), 12 deletions(-)

diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
index e81cdd56cf..227a95bb3c 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
@@ -390,26 +390,17 @@ public class PulsarOperator {
             Integer messageCount, InlongStreamInfo streamInfo) {
         LOGGER.info("begin to query message for topic {}, subName={}", 
topicFullName, subName);
 
-        List<Message<byte[]>> messages;
-        try {
-            messages = pulsarAdmin.topics().peekMessages(topicFullName, 
subName, messageCount);
-        } catch (PulsarAdminException e) {
-            String errMsg = "failed to query peek messages: ";
-            LOGGER.error(errMsg, e);
-            throw new BusinessException(errMsg + e.getMessage());
-        }
-
-        int index = 0;
         List<BriefMQMessage> messageList = new ArrayList<>();
-        for (Message<byte[]> pulsarMessage : messages) {
+        for (int i = 0; i < messageCount; i++) {
             try {
+                Message<byte[]> pulsarMessage = 
pulsarAdmin.topics().examineMessage(topicFullName, "latest", i);
                 Map<String, String> headers = pulsarMessage.getProperties();
                 int wrapTypeId = 
Integer.parseInt(headers.getOrDefault(InlongConstants.MSG_ENCODE_VER,
                         
Integer.toString(DataProxyMsgEncType.MSG_ENCODE_TYPE_INLONGMSG.getId())));
                 DeserializeOperator deserializeOperator = 
deserializeOperatorFactory.getInstance(
                         DataProxyMsgEncType.valueOf(wrapTypeId));
                 messageList.addAll(
-                        deserializeOperator.decodeMsg(streamInfo, 
pulsarMessage.getData(), headers, ++index));
+                        deserializeOperator.decodeMsg(streamInfo, 
pulsarMessage.getData(), headers, i));
             } catch (Exception e) {
                 String errMsg = "decode msg error: ";
                 LOGGER.error(errMsg, e);

Reply via email to