This is an automated email from the ASF dual-hosted git repository. vernedeng pushed a commit to branch branch-1.8 in repository https://gitbox.apache.org/repos/asf/inlong.git
commit 12fb5689ce3affd1e87f9f6c88bea03d4b269ac7 Author: fuweng11 <76141879+fuwen...@users.noreply.github.com> AuthorDate: Tue Jul 18 14:07:00 2023 +0800 [INLONG-8552][Manager] Fix the data preview obtained is not the latest message (#8553) (cherry picked from commit 293d9dc0db3141b206c689eddeec1ebd975989b4) --- .../service/resource/queue/pulsar/PulsarOperator.java | 15 +++------------ 1 file changed, 3 insertions(+), 12 deletions(-) diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java index e81cdd56cf..227a95bb3c 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java @@ -390,26 +390,17 @@ public class PulsarOperator { Integer messageCount, InlongStreamInfo streamInfo) { LOGGER.info("begin to query message for topic {}, subName={}", topicFullName, subName); - List<Message<byte[]>> messages; - try { - messages = pulsarAdmin.topics().peekMessages(topicFullName, subName, messageCount); - } catch (PulsarAdminException e) { - String errMsg = "failed to query peek messages: "; - LOGGER.error(errMsg, e); - throw new BusinessException(errMsg + e.getMessage()); - } - - int index = 0; List<BriefMQMessage> messageList = new ArrayList<>(); - for (Message<byte[]> pulsarMessage : messages) { + for (int i = 0; i < messageCount; i++) { try { + Message<byte[]> pulsarMessage = pulsarAdmin.topics().examineMessage(topicFullName, "latest", i); Map<String, String> headers = pulsarMessage.getProperties(); int wrapTypeId = Integer.parseInt(headers.getOrDefault(InlongConstants.MSG_ENCODE_VER, Integer.toString(DataProxyMsgEncType.MSG_ENCODE_TYPE_INLONGMSG.getId()))); DeserializeOperator deserializeOperator = deserializeOperatorFactory.getInstance( DataProxyMsgEncType.valueOf(wrapTypeId)); messageList.addAll( - deserializeOperator.decodeMsg(streamInfo, pulsarMessage.getData(), headers, ++index)); + deserializeOperator.decodeMsg(streamInfo, pulsarMessage.getData(), headers, i)); } catch (Exception e) { String errMsg = "decode msg error: "; LOGGER.error(errMsg, e);