This is an automated email from the ASF dual-hosted git repository.

aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new d20a29cb14 [INLONG-10638][Manager] Data preview supports filtering 
function (#10639)
d20a29cb14 is described below

commit d20a29cb146400889958fd7b4088f92b5e4fbf8a
Author: fuweng11 <76141879+fuwen...@users.noreply.github.com>
AuthorDate: Wed Jul 17 19:11:08 2024 +0800

    [INLONG-10638][Manager] Data preview supports filtering function (#10639)
---
 .../api/inner/client/InlongStreamClient.java       | 17 +++++--
 .../client/api/service/InlongStreamApi.java        |  5 +-
 .../inlong/manager/common/util/JsonUtils.java      | 20 ++++++++
 .../manager/pojo/stream/QueryMessageRequest.java   | 54 ++++++++++++++++++++++
 .../service/message/DeserializeOperator.java       | 34 +++++++++++++-
 .../message/InlongMsgDeserializeOperator.java      | 10 ++--
 .../service/message/PbMsgDeserializeOperator.java  | 14 ++++--
 .../service/message/RawMsgDeserializeOperator.java | 11 +++--
 .../resource/queue/QueueResourceOperator.java      |  5 +-
 .../resource/queue/kafka/KafkaOperator.java        | 11 +++--
 .../queue/kafka/KafkaQueueResourceOperator.java    |  7 +--
 .../resource/queue/pulsar/PulsarOperator.java      | 15 +++---
 .../queue/pulsar/PulsarQueueResourceOperator.java  | 11 +++--
 .../queue/pulsar/QueryLatestMessagesRunnable.java  |  9 ++--
 .../resource/queue/tubemq/TubeMQOperator.java      |  8 ++--
 .../queue/tubemq/TubeMQQueueResourceOperator.java  |  5 +-
 .../service/stream/InlongStreamService.java        |  7 ++-
 .../service/stream/InlongStreamServiceImpl.java    |  9 ++--
 .../resource/queue/kafka/KafkaOperatorTest.java    | 20 +++++---
 .../web/controller/InlongStreamController.java     | 11 ++---
 20 files changed, 209 insertions(+), 74 deletions(-)

diff --git 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongStreamClient.java
 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongStreamClient.java
index 89ca7eef68..e5573d2db1 100644
--- 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongStreamClient.java
+++ 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongStreamClient.java
@@ -21,6 +21,7 @@ import 
org.apache.inlong.manager.client.api.ClientConfiguration;
 import org.apache.inlong.manager.client.api.service.InlongStreamApi;
 import org.apache.inlong.manager.client.api.util.ClientUtils;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.util.JsonUtils;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.pojo.common.BatchResult;
 import org.apache.inlong.manager.pojo.common.PageResult;
@@ -30,11 +31,15 @@ import 
org.apache.inlong.manager.pojo.sink.ParseFieldRequest;
 import org.apache.inlong.manager.pojo.stream.InlongStreamBriefInfo;
 import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.pojo.stream.InlongStreamPageRequest;
+import org.apache.inlong.manager.pojo.stream.QueryMessageRequest;
 import org.apache.inlong.manager.pojo.stream.StreamField;
 
+import com.fasterxml.jackson.core.type.TypeReference;
 import org.apache.commons.lang3.tuple.Pair;
 
 import java.util.List;
+import java.util.Map;
+import java.util.Objects;
 
 import static 
org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_JSON;
 import static 
org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_SQL;
@@ -295,11 +300,15 @@ public class InlongStreamClient {
         return parseFields(request);
     }
 
-    public List<BriefMQMessage> queryMessage(String groupId, String streamId, 
Integer messageCount) {
-        Preconditions.expectNotBlank(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY);
-        Preconditions.expectNotBlank(streamId, 
ErrorCodeEnum.STREAM_ID_IS_EMPTY);
+    public List<BriefMQMessage> queryMessage(QueryMessageRequest request) {
+        Preconditions.expectNotBlank(request.getGroupId(), 
ErrorCodeEnum.GROUP_ID_IS_EMPTY);
+        Preconditions.expectNotBlank(request.getStreamId(), 
ErrorCodeEnum.STREAM_ID_IS_EMPTY);
+        Map<String, Object> requestMap = JsonUtils.parseObject(request,
+                new TypeReference<Map<String, Object>>() {
+                });
+        requestMap.entrySet().removeIf(entry -> 
Objects.isNull(entry.getValue()));
         Response<List<BriefMQMessage>> response = ClientUtils.executeHttpCall(
-                inlongStreamApi.listMessages(groupId, streamId, messageCount));
+                inlongStreamApi.listMessages(requestMap));
         ClientUtils.assertRespSuccess(response);
         return response.getData();
     }
diff --git 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongStreamApi.java
 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongStreamApi.java
index 6b47d8aeb2..ac31b09d20 100644
--- 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongStreamApi.java
+++ 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongStreamApi.java
@@ -34,8 +34,10 @@ import retrofit2.http.GET;
 import retrofit2.http.POST;
 import retrofit2.http.Path;
 import retrofit2.http.Query;
+import retrofit2.http.QueryMap;
 
 import java.util.List;
+import java.util.Map;
 
 public interface InlongStreamApi {
 
@@ -88,6 +90,5 @@ public interface InlongStreamApi {
     Call<Response<List<StreamField>>> parseFields(@Body ParseFieldRequest 
parseFieldRequest);
 
     @GET("stream/listMessages")
-    Call<Response<List<BriefMQMessage>>> listMessages(@Query("groupId") String 
groupId,
-            @Query("streamId") String streamId, @Query("messageCount") Integer 
messageCount);
+    Call<Response<List<BriefMQMessage>>> listMessages(@QueryMap Map<String, 
Object> query);
 }
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/JsonUtils.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/JsonUtils.java
index de7f5647c1..8f21ac6364 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/JsonUtils.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/JsonUtils.java
@@ -121,6 +121,26 @@ public class JsonUtils {
         }
     }
 
+    /**
+     * Parse Object to Java object.
+     * <p/>
+     * This method enhancements to {@link #parseObject(Object, TypeReference)},
+     * as the above method can not solve this situation:
+     *
+     * @param object object
+     * @param typeReference The generic type is actually the parsed java type
+     * @return java object;
+     * @throws JsonException when parse error
+     */
+    public static <T> T parseObject(Object object, TypeReference<T> 
typeReference) {
+        try {
+            return OBJECT_MAPPER.convertValue(object, typeReference);
+        } catch (Exception e) {
+            log.error("json parse err for: " + object, e);
+            throw new JsonException(e);
+        }
+    }
+
     /**
      * Parse JSON string to Java object.
      * <p/>
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/QueryMessageRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/QueryMessageRequest.java
new file mode 100644
index 0000000000..3c939222e5
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/QueryMessageRequest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.stream;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * Query message request
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+@ApiModel("query message request")
+public class QueryMessageRequest {
+
+    @ApiModelProperty(value = "Inlong group id")
+    private String groupId;
+
+    @ApiModelProperty(value = "Inlong stream id")
+    private String streamId;
+
+    @ApiModelProperty(value = "Message count")
+    private Integer messageCount = 100;
+
+    @ApiModelProperty(value = "Field name")
+    private String fieldName;
+
+    @ApiModelProperty(value = "Operation type, like !=, =, like")
+    private String operationType;
+
+    @ApiModelProperty(value = "TargetValue")
+    private String targetValue;
+}
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/DeserializeOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/DeserializeOperator.java
index c62d3f6862..1fb898da99 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/DeserializeOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/DeserializeOperator.java
@@ -21,10 +21,15 @@ import org.apache.inlong.common.enums.MessageWrapType;
 import 
org.apache.inlong.common.pojo.sort.dataflow.deserialization.DeserializationConfig;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.pojo.consume.BriefMQMessage;
+import org.apache.inlong.manager.pojo.consume.BriefMQMessage.FieldInfo;
 import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.pojo.stream.QueryMessageRequest;
+
+import org.apache.commons.lang3.StringUtils;
 
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 
 /**
  * Deserialize of message operator
@@ -55,8 +60,8 @@ public interface DeserializeOperator {
      * @param index message index
      * @return list of brief mq message info
      */
-    default List<BriefMQMessage> decodeMsg(InlongStreamInfo streamInfo,
-            byte[] msgBytes, Map<String, String> headers, int index) throws 
Exception {
+    default List<BriefMQMessage> decodeMsg(InlongStreamInfo streamInfo, 
List<BriefMQMessage> briefMQMessages,
+            byte[] msgBytes, Map<String, String> headers, int index, 
QueryMessageRequest request) throws Exception {
         return null;
     }
 
@@ -64,4 +69,29 @@ public interface DeserializeOperator {
         throw new BusinessException(String.format("current type not support 
DeserializationInfo for wrapType=%s",
                 streamInfo.getWrapType()));
     }
+
+    default Boolean checkIfFilter(QueryMessageRequest request, List<FieldInfo> 
streamFieldList) {
+        if (StringUtils.isBlank(request.getFieldName()) || 
StringUtils.isBlank(request.getOperationType())
+                || StringUtils.isBlank(request.getTargetValue())) {
+            return false;
+        }
+        boolean ifFilter = false;
+        FieldInfo fieldInfo = streamFieldList.stream()
+                .filter(v -> Objects.equals(v.getFieldName(), 
request.getFieldName())).findFirst()
+                .orElse(null);
+        if (fieldInfo != null) {
+            switch (request.getOperationType()) {
+                case "=":
+                    ifFilter = !Objects.equals(request.getTargetValue(), 
fieldInfo.getFieldValue());
+                    break;
+                case "!=":
+                    ifFilter = Objects.equals(request.getTargetValue(), 
fieldInfo.getFieldValue());
+                    break;
+                case "like":
+                    ifFilter = fieldInfo.getFieldValue() != null
+                            && 
!fieldInfo.getFieldValue().contains(request.getTargetValue());
+            }
+        }
+        return ifFilter;
+    }
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/InlongMsgDeserializeOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/InlongMsgDeserializeOperator.java
index b04931db0f..10778bbc20 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/InlongMsgDeserializeOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/InlongMsgDeserializeOperator.java
@@ -28,6 +28,7 @@ import 
org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.pojo.consume.BriefMQMessage;
 import org.apache.inlong.manager.pojo.consume.BriefMQMessage.FieldInfo;
 import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.pojo.stream.QueryMessageRequest;
 import org.apache.inlong.manager.service.datatype.DataTypeOperator;
 import org.apache.inlong.manager.service.datatype.DataTypeOperatorFactory;
 
@@ -36,7 +37,6 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
 import java.nio.charset.Charset;
-import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -55,11 +55,10 @@ public class InlongMsgDeserializeOperator implements 
DeserializeOperator {
     }
 
     @Override
-    public List<BriefMQMessage> decodeMsg(InlongStreamInfo streamInfo, byte[] 
msgBytes, Map<String, String> headers,
-            int index) {
+    public List<BriefMQMessage> decodeMsg(InlongStreamInfo streamInfo, 
List<BriefMQMessage> messageList,
+            byte[] msgBytes, Map<String, String> headers, int index, 
QueryMessageRequest request) {
         String groupId = headers.get(AttributeConstants.GROUP_ID);
         String streamId = headers.get(AttributeConstants.STREAM_ID);
-        List<BriefMQMessage> messageList = new ArrayList<>();
         InLongMsg inLongMsg = InLongMsg.parseFrom(msgBytes);
         for (String attr : inLongMsg.getAttrs()) {
             Map<String, String> attrMap = StringUtil.splitKv(attr, 
INLONGMSG_ATTR_ENTRY_DELIMITER,
@@ -87,6 +86,9 @@ public class InlongMsgDeserializeOperator implements 
DeserializeOperator {
                     DataTypeOperator dataTypeOperator =
                             
dataTypeOperatorFactory.getInstance(DataTypeEnum.forType(streamInfo.getDataType()));
                     List<FieldInfo> streamFieldList = 
dataTypeOperator.parseFields(body, streamInfo);
+                    if (checkIfFilter(request, streamFieldList)) {
+                        continue;
+                    }
                     BriefMQMessage message = BriefMQMessage.builder()
                             .id(index)
                             .inlongGroupId(groupId)
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/PbMsgDeserializeOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/PbMsgDeserializeOperator.java
index 79760cf737..e8e32ab80a 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/PbMsgDeserializeOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/PbMsgDeserializeOperator.java
@@ -27,6 +27,7 @@ import 
org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.pojo.consume.BriefMQMessage;
 import org.apache.inlong.manager.pojo.consume.BriefMQMessage.FieldInfo;
 import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.pojo.stream.QueryMessageRequest;
 import org.apache.inlong.manager.service.datatype.DataTypeOperator;
 import org.apache.inlong.manager.service.datatype.DataTypeOperatorFactory;
 import org.apache.inlong.sdk.commons.protocol.ProxySdk.INLONG_COMPRESSED_TYPE;
@@ -58,8 +59,8 @@ public class PbMsgDeserializeOperator implements 
DeserializeOperator {
     }
 
     @Override
-    public List<BriefMQMessage> decodeMsg(InlongStreamInfo streamInfo,
-            byte[] msgBytes, Map<String, String> headers, int index) throws 
Exception {
+    public List<BriefMQMessage> decodeMsg(InlongStreamInfo streamInfo, 
List<BriefMQMessage> briefMQMessages,
+            byte[] msgBytes, Map<String, String> headers, int index, 
QueryMessageRequest request) throws Exception {
         int compressType = 
Integer.parseInt(headers.getOrDefault(COMPRESS_TYPE_KEY, "0"));
         byte[] values = msgBytes;
         switch (compressType) {
@@ -74,10 +75,12 @@ public class PbMsgDeserializeOperator implements 
DeserializeOperator {
             default:
                 throw new IllegalArgumentException("Unknown compress type:" + 
compressType);
         }
-        return transformMessageObjs(MessageObjs.parseFrom(values), streamInfo, 
index);
+        
briefMQMessages.addAll(transformMessageObjs(MessageObjs.parseFrom(values), 
streamInfo, index, request));
+        return briefMQMessages;
     }
 
-    private List<BriefMQMessage> transformMessageObjs(MessageObjs messageObjs, 
InlongStreamInfo streamInfo, int index) {
+    private List<BriefMQMessage> transformMessageObjs(MessageObjs messageObjs, 
InlongStreamInfo streamInfo, int index,
+            QueryMessageRequest request) {
         if (null == messageObjs) {
             return null;
         }
@@ -96,6 +99,9 @@ public class PbMsgDeserializeOperator implements 
DeserializeOperator {
                 DataTypeOperator dataTypeOperator =
                         
dataTypeOperatorFactory.getInstance(DataTypeEnum.forType(streamInfo.getDataType()));
                 List<FieldInfo> streamFieldList = 
dataTypeOperator.parseFields(body, streamInfo);
+                if (checkIfFilter(request, streamFieldList)) {
+                    continue;
+                }
                 BriefMQMessage message = BriefMQMessage.builder()
                         .id(index)
                         
.inlongGroupId(headers.get(AttributeConstants.GROUP_ID))
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/RawMsgDeserializeOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/RawMsgDeserializeOperator.java
index db025d2c2f..d55c7d6cad 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/RawMsgDeserializeOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/RawMsgDeserializeOperator.java
@@ -24,6 +24,7 @@ import 
org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.pojo.consume.BriefMQMessage;
 import org.apache.inlong.manager.pojo.consume.BriefMQMessage.FieldInfo;
 import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.pojo.stream.QueryMessageRequest;
 import org.apache.inlong.manager.service.datatype.DataTypeOperator;
 import org.apache.inlong.manager.service.datatype.DataTypeOperatorFactory;
 
@@ -49,8 +50,8 @@ public class RawMsgDeserializeOperator implements 
DeserializeOperator {
     }
 
     @Override
-    public List<BriefMQMessage> decodeMsg(InlongStreamInfo streamInfo,
-            byte[] msgBytes, Map<String, String> headers, int index) {
+    public List<BriefMQMessage> decodeMsg(InlongStreamInfo streamInfo, 
List<BriefMQMessage> briefMQMessages,
+            byte[] msgBytes, Map<String, String> headers, int index, 
QueryMessageRequest request) {
         String groupId = headers.get(AttributeConstants.GROUP_ID);
         String streamId = headers.get(AttributeConstants.STREAM_ID);
         long msgTime = Long.parseLong(headers.getOrDefault(MSG_TIME_KEY, "0"));
@@ -59,6 +60,9 @@ public class RawMsgDeserializeOperator implements 
DeserializeOperator {
             DataTypeOperator dataTypeOperator =
                     
dataTypeOperatorFactory.getInstance(DataTypeEnum.forType(streamInfo.getDataType()));
             List<FieldInfo> fieldList = dataTypeOperator.parseFields(body, 
streamInfo);
+            if (checkIfFilter(request, fieldList)) {
+                return briefMQMessages;
+            }
             BriefMQMessage briefMQMessage = BriefMQMessage.builder()
                     .id(index)
                     .inlongGroupId(groupId)
@@ -69,7 +73,8 @@ public class RawMsgDeserializeOperator implements 
DeserializeOperator {
                     .body(body)
                     .fieldList(fieldList)
                     .build();
-            return Collections.singletonList(briefMQMessage);
+            briefMQMessages.addAll(Collections.singletonList(briefMQMessage));
+            return briefMQMessages;
         } catch (Exception e) {
             String errMsg = String.format("decode msg failed for groupId=%s, 
streamId=%s", groupId, streamId);
             log.error(errMsg, e);
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/QueueResourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/QueueResourceOperator.java
index 4664f670a6..528884c996 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/QueueResourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/QueueResourceOperator.java
@@ -22,6 +22,7 @@ import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
 import org.apache.inlong.manager.pojo.consume.BriefMQMessage;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.pojo.stream.QueryMessageRequest;
 
 import javax.validation.constraints.NotBlank;
 import javax.validation.constraints.NotNull;
@@ -81,12 +82,12 @@ public interface QueueResourceOperator {
      *
      * @param groupInfo inlong group info
      * @param streamInfo inlong stream info
-     * @param messageCount count of messages to query
+     * @param request query message request
      * @throws Exception any exception if occurred
      * @return query brief mq message info
      */
     default List<BriefMQMessage> queryLatestMessages(InlongGroupInfo 
groupInfo, InlongStreamInfo streamInfo,
-            Integer messageCount) throws Exception {
+            QueryMessageRequest request) throws Exception {
         return null;
     }
 
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperator.java
index 763a654c26..b49e52317a 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperator.java
@@ -24,6 +24,7 @@ import 
org.apache.inlong.manager.pojo.cluster.kafka.KafkaClusterInfo;
 import org.apache.inlong.manager.pojo.consume.BriefMQMessage;
 import org.apache.inlong.manager.pojo.group.kafka.InlongKafkaInfo;
 import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.pojo.stream.QueryMessageRequest;
 import org.apache.inlong.manager.service.cluster.InlongClusterServiceImpl;
 import org.apache.inlong.manager.service.message.DeserializeOperator;
 import org.apache.inlong.manager.service.message.DeserializeOperatorFactory;
@@ -113,19 +114,19 @@ public class KafkaOperator {
      * Query topic message for the given Kafka cluster.
      */
     public List<BriefMQMessage> queryLatestMessage(KafkaClusterInfo 
clusterInfo, String topicName,
-            String consumeGroup, Integer messageCount, InlongStreamInfo 
streamInfo) {
+            String consumeGroup, InlongStreamInfo streamInfo, 
QueryMessageRequest request) {
         LOGGER.debug("begin to query message for topic {} in cluster: {}", 
topicName, clusterInfo);
 
         Properties properties = getProperties(clusterInfo.getUrl(), 
consumeGroup);
         KafkaConsumer<byte[], byte[]> consumer = new 
KafkaConsumer<>(properties);
-        return getLatestMessage(consumer, topicName, messageCount, streamInfo);
+        return getLatestMessage(consumer, topicName, streamInfo, request);
     }
 
     @VisibleForTesting
     public List<BriefMQMessage> getLatestMessage(Consumer<byte[], byte[]> 
consumer, String topicName,
-            Integer messageCount, InlongStreamInfo streamInfo) {
+            InlongStreamInfo streamInfo, QueryMessageRequest request) {
         List<BriefMQMessage> messageList = new ArrayList<>();
-
+        Integer messageCount = request.getMessageCount();
         try {
             List<PartitionInfo> partitionInfoList = 
consumer.partitionsFor(topicName);
             List<TopicPartition> topicPartitionList = 
partitionInfoList.stream()
@@ -162,7 +163,7 @@ public class KafkaOperator {
                             
MessageWrapType.valueOf(Integer.parseInt(headers.get(InlongConstants.MSG_ENCODE_VER)));
                 }
                 DeserializeOperator deserializeOperator = 
deserializeOperatorFactory.getInstance(messageWrapType);
-                messageList.addAll(deserializeOperator.decodeMsg(streamInfo, 
record.value(), headers, index));
+                deserializeOperator.decodeMsg(streamInfo, messageList, 
record.value(), headers, index, request);
                 if (messageList.size() >= messageCount) {
                     break;
                 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaQueueResourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaQueueResourceOperator.java
index cc0fa83dd2..6708bd8ecd 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaQueueResourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaQueueResourceOperator.java
@@ -32,6 +32,7 @@ import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.pojo.group.kafka.InlongKafkaInfo;
 import org.apache.inlong.manager.pojo.stream.InlongStreamBriefInfo;
 import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.pojo.stream.QueryMessageRequest;
 import org.apache.inlong.manager.service.cluster.InlongClusterService;
 import org.apache.inlong.manager.service.consume.InlongConsumeService;
 import org.apache.inlong.manager.service.resource.queue.QueueResourceOperator;
@@ -192,7 +193,7 @@ public class KafkaQueueResourceOperator implements 
QueueResourceOperator {
 
     @Override
     public List<BriefMQMessage> queryLatestMessages(InlongGroupInfo groupInfo, 
InlongStreamInfo streamInfo,
-            Integer messageCount) {
+            QueryMessageRequest request) {
         ClusterInfo clusterInfo = 
clusterService.getOne(groupInfo.getInlongClusterTag(), null, ClusterType.KAFKA);
 
         String topicName = streamInfo.getMqResource();
@@ -204,8 +205,8 @@ public class KafkaQueueResourceOperator implements 
QueueResourceOperator {
 
         String consumeGroup =
                 String.format(KAFKA_CONSUMER_GROUP_REALTIME_REVIEW, 
groupInfo.getInlongClusterTag(), topicName);
-        return kafkaOperator.queryLatestMessage((KafkaClusterInfo) 
clusterInfo, topicName, consumeGroup, messageCount,
-                streamInfo);
+        return kafkaOperator.queryLatestMessage((KafkaClusterInfo) 
clusterInfo, topicName, consumeGroup, streamInfo,
+                request);
     }
 
     @Override
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
index 5c9e9f5d10..e5e8d4c815 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
@@ -34,6 +34,7 @@ import 
org.apache.inlong.manager.pojo.queue.pulsar.PulsarTenantInfo;
 import org.apache.inlong.manager.pojo.queue.pulsar.PulsarTopicInfo;
 import org.apache.inlong.manager.pojo.queue.pulsar.PulsarTopicMetadata;
 import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.pojo.stream.QueryMessageRequest;
 import org.apache.inlong.manager.service.cluster.InlongClusterServiceImpl;
 import org.apache.inlong.manager.service.message.DeserializeOperator;
 import org.apache.inlong.manager.service.message.DeserializeOperatorFactory;
@@ -409,16 +410,16 @@ public class PulsarOperator {
      * Query topic message for the given pulsar cluster.
      */
     public List<BriefMQMessage> queryLatestMessage(PulsarClusterInfo 
pulsarClusterInfo, String topicFullName,
-            Integer messageCount, InlongStreamInfo streamInfo, boolean serial) 
{
+            QueryMessageRequest request, InlongStreamInfo streamInfo, boolean 
serial) {
         LOGGER.info("begin to query message for topic {}", topicFullName);
         List<BriefMQMessage> messageList = new ArrayList<>();
         int partitionCount = getPartitionCount(pulsarClusterInfo, 
topicFullName);
-        for (int messageIndex = 0; messageIndex < messageCount; 
messageIndex++) {
+        for (int messageIndex = 0; messageIndex < 100; messageIndex++) {
             int currentPartitionNum = messageIndex % partitionCount;
             int messagePosition = messageIndex / partitionCount + 1;
             String topicNameOfPartition = 
buildTopicNameOfPartition(topicFullName, currentPartitionNum, serial);
-            messageList.addAll(queryMessageFromPulsar(topicNameOfPartition, 
pulsarClusterInfo, messageIndex,
-                    streamInfo, messagePosition));
+            messageList.addAll(queryMessageFromPulsar(topicNameOfPartition, 
pulsarClusterInfo, messageIndex, streamInfo,
+                    messagePosition, request));
         }
         LOGGER.info("success query message for topic={}", topicFullName);
         return messageList;
@@ -444,8 +445,7 @@ public class PulsarOperator {
      * Query pulsar message.
      */
     private List<BriefMQMessage> queryMessageFromPulsar(String topicPartition, 
PulsarClusterInfo pulsarClusterInfo,
-            int index,
-            InlongStreamInfo streamInfo, int messagePosition) {
+            int index, InlongStreamInfo streamInfo, int messagePosition, 
QueryMessageRequest request) {
         List<BriefMQMessage> briefMQMessages = new ArrayList<>();
         try {
             ResponseEntity<byte[]> httpResponse =
@@ -462,8 +462,7 @@ public class PulsarOperator {
                         
MessageWrapType.valueOf(Integer.parseInt(headers.get(InlongConstants.MSG_ENCODE_VER)));
             }
             DeserializeOperator deserializeOperator = 
deserializeOperatorFactory.getInstance(messageWrapType);
-            briefMQMessages.addAll(deserializeOperator.decodeMsg(streamInfo, 
messageInfo.getBody(),
-                    headers, index));
+            deserializeOperator.decodeMsg(streamInfo, briefMQMessages, 
messageInfo.getBody(), headers, index, request);
         } catch (Exception e) {
             LOGGER.warn("query message from pulsar error for groupId = {}, 
streamId = {}",
                     streamInfo.getInlongGroupId(),
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarQueueResourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarQueueResourceOperator.java
index 6d86760ea4..efbdeabf20 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarQueueResourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarQueueResourceOperator.java
@@ -36,6 +36,7 @@ import 
org.apache.inlong.manager.pojo.queue.pulsar.PulsarTopicInfo;
 import org.apache.inlong.manager.pojo.sink.StreamSink;
 import org.apache.inlong.manager.pojo.stream.InlongStreamBriefInfo;
 import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.pojo.stream.QueryMessageRequest;
 import org.apache.inlong.manager.service.cluster.InlongClusterService;
 import org.apache.inlong.manager.service.consume.InlongConsumeService;
 import org.apache.inlong.manager.service.resource.queue.QueueResourceOperator;
@@ -305,23 +306,23 @@ public class PulsarQueueResourceOperator implements 
QueueResourceOperator {
     /**
      * Query latest message from pulsar
      */
-    public List<BriefMQMessage> queryLatestMessages(InlongGroupInfo groupInfo,
-            InlongStreamInfo streamInfo, Integer messageCount) throws 
Exception {
+    public List<BriefMQMessage> queryLatestMessages(InlongGroupInfo groupInfo, 
InlongStreamInfo streamInfo,
+            QueryMessageRequest request) throws Exception {
         List<ClusterInfo> pulsarClusterList = 
clusterService.listByTagAndType(groupInfo.getInlongClusterTag(),
                 ClusterType.PULSAR);
         List<BriefMQMessage> briefMQMessages = 
Collections.synchronizedList(new ArrayList<>());
-        QueryCountDownLatch queryLatch = new QueryCountDownLatch(messageCount, 
pulsarClusterList.size());
+        QueryCountDownLatch queryLatch = new 
QueryCountDownLatch(request.getMessageCount(), pulsarClusterList.size());
         InlongPulsarInfo inlongPulsarInfo = ((InlongPulsarInfo) groupInfo);
         for (ClusterInfo clusterInfo : pulsarClusterList) {
             QueryLatestMessagesRunnable task = new 
QueryLatestMessagesRunnable(inlongPulsarInfo, streamInfo,
-                    (PulsarClusterInfo) clusterInfo, pulsarOperator, 
messageCount, briefMQMessages, queryLatch);
+                    (PulsarClusterInfo) clusterInfo, pulsarOperator, request, 
briefMQMessages, queryLatch);
             this.executor.execute(task);
         }
         queryLatch.await(30, TimeUnit.SECONDS);
         log.info("success query pulsar message for groupId={}, streamId={}", 
streamInfo.getInlongGroupId(),
                 streamInfo.getInlongStreamId());
 
-        int finalMsgCount = Math.min(messageCount, briefMQMessages.size());
+        int finalMsgCount = Math.min(request.getMessageCount(), 
briefMQMessages.size());
         if (finalMsgCount > 0) {
             return briefMQMessages.subList(0, finalMsgCount);
         } else {
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/QueryLatestMessagesRunnable.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/QueryLatestMessagesRunnable.java
index caf61e0ebe..4fb6b58e49 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/QueryLatestMessagesRunnable.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/QueryLatestMessagesRunnable.java
@@ -22,6 +22,7 @@ import 
org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo;
 import org.apache.inlong.manager.pojo.consume.BriefMQMessage;
 import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarInfo;
 import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.pojo.stream.QueryMessageRequest;
 
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -37,7 +38,7 @@ public class QueryLatestMessagesRunnable implements Runnable {
     private InlongStreamInfo streamInfo;
     private PulsarClusterInfo clusterInfo;
     private PulsarOperator pulsarOperator;
-    private Integer messageCount;
+    private QueryMessageRequest queryMessageRequest;
     private List<BriefMQMessage> briefMQMessages;
     private QueryCountDownLatch latch;
 
@@ -45,14 +46,14 @@ public class QueryLatestMessagesRunnable implements 
Runnable {
             InlongStreamInfo streamInfo,
             PulsarClusterInfo clusterInfo,
             PulsarOperator pulsarOperator,
-            Integer messageCount,
+            QueryMessageRequest queryMessageRequest,
             List<BriefMQMessage> briefMQMessages,
             QueryCountDownLatch latch) {
         this.inlongPulsarInfo = inlongPulsarInfo;
         this.streamInfo = streamInfo;
         this.clusterInfo = clusterInfo;
         this.pulsarOperator = pulsarOperator;
-        this.messageCount = messageCount;
+        this.queryMessageRequest = queryMessageRequest;
         this.briefMQMessages = briefMQMessages;
         this.latch = latch;
     }
@@ -69,7 +70,7 @@ public class QueryLatestMessagesRunnable implements Runnable {
         String fullTopicName = tenant + "/" + namespace + "/" + topicName;
         boolean serial = 
InlongConstants.PULSAR_QUEUE_TYPE_SERIAL.equals(inlongPulsarInfo.getQueueModule());
         List<BriefMQMessage> messages =
-                pulsarOperator.queryLatestMessage(clusterInfo, fullTopicName, 
messageCount, streamInfo, serial);
+                pulsarOperator.queryLatestMessage(clusterInfo, fullTopicName, 
queryMessageRequest, streamInfo, serial);
         if (CollectionUtils.isNotEmpty(messages)) {
             briefMQMessages.addAll(messages);
             this.latch.countDown(messages.size());
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/tubemq/TubeMQOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/tubemq/TubeMQOperator.java
index cdbc4da86d..050de07805 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/tubemq/TubeMQOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/tubemq/TubeMQOperator.java
@@ -30,6 +30,7 @@ import 
org.apache.inlong.manager.pojo.queue.tubemq.TubeHttpResponse;
 import org.apache.inlong.manager.pojo.queue.tubemq.TubeMessageResponse;
 import 
org.apache.inlong.manager.pojo.queue.tubemq.TubeMessageResponse.TubeDataInfo;
 import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.pojo.stream.QueryMessageRequest;
 import org.apache.inlong.manager.service.cluster.InlongClusterServiceImpl;
 import org.apache.inlong.manager.service.message.DeserializeOperator;
 import org.apache.inlong.manager.service.message.DeserializeOperatorFactory;
@@ -269,7 +270,7 @@ public class TubeMQOperator {
      * Query topic message for the given tubemq cluster.
      */
     public List<BriefMQMessage> queryLastMessage(TubeClusterInfo tubeCluster, 
String topicName,
-            Integer msgCount, InlongStreamInfo streamInfo) {
+            InlongStreamInfo streamInfo, QueryMessageRequest request) {
         LOGGER.info("begin to query message for topic {} in cluster: {}", 
topicName, tubeCluster);
         String masterUrl = tubeCluster.getMasterWebUrl();
         TubeBrokerInfo brokerView = this.getBrokerInfo(masterUrl);
@@ -286,7 +287,8 @@ public class TubeMQOperator {
                 throw new BusinessException("TubeMQ master url or TubeMQ topic 
cannot be null");
             }
 
-            String url = "http://"; + brokerUrl + QUERY_MESSAGE_PATH + 
TOPIC_NAME + topicName + MSG_COUNT + msgCount;
+            String url = "http://"; + brokerUrl + QUERY_MESSAGE_PATH + 
TOPIC_NAME + topicName + MSG_COUNT
+                    + request.getMessageCount();
             TubeMessageResponse response = HttpUtils.request(restTemplate, 
url, HttpMethod.GET,
                     null, new HttpHeaders(), TubeMessageResponse.class);
             if (response.getErrCode() != SUCCESS_CODE && response.getErrCode() 
!= 200) {
@@ -310,7 +312,7 @@ public class TubeMQOperator {
                 }
                 byte[] messageData = 
Base64.getDecoder().decode(tubeDataInfo.getData());
                 DeserializeOperator deserializeOperator = 
deserializeOperatorFactory.getInstance(messageWrapType);
-                messageList.addAll(deserializeOperator.decodeMsg(streamInfo, 
messageData, map, index));
+                deserializeOperator.decodeMsg(streamInfo, messageList, 
messageData, map, index, request);
             }
 
             LOGGER.info("success query messages for topic={}", topicName);
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/tubemq/TubeMQQueueResourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/tubemq/TubeMQQueueResourceOperator.java
index f6cf034e67..14dbd4f202 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/tubemq/TubeMQQueueResourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/tubemq/TubeMQQueueResourceOperator.java
@@ -30,6 +30,7 @@ import org.apache.inlong.manager.pojo.consume.BriefMQMessage;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.pojo.sink.StreamSink;
 import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.pojo.stream.QueryMessageRequest;
 import org.apache.inlong.manager.service.cluster.InlongClusterService;
 import org.apache.inlong.manager.service.consume.InlongConsumeService;
 import org.apache.inlong.manager.service.resource.queue.QueueResourceOperator;
@@ -132,14 +133,14 @@ public class TubeMQQueueResourceOperator implements 
QueueResourceOperator {
     }
 
     public List<BriefMQMessage> queryLatestMessages(InlongGroupInfo groupInfo, 
InlongStreamInfo streamInfo,
-            Integer messageCount) {
+            QueryMessageRequest request) {
         Preconditions.expectNotNull(groupInfo, "inlong group info cannot be 
null");
 
         String clusterTag = groupInfo.getInlongClusterTag();
         TubeClusterInfo tubeCluster = (TubeClusterInfo) 
clusterService.getOne(clusterTag, null, ClusterType.TUBEMQ);
         String topicName = groupInfo.getMqResource();
 
-        return tubeMQOperator.queryLastMessage(tubeCluster, topicName, 
messageCount, streamInfo);
+        return tubeMQOperator.queryLastMessage(tubeCluster, topicName, 
streamInfo, request);
     }
 
     @Override
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamService.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamService.java
index 96002d0ebd..013a6546b5 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamService.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamService.java
@@ -27,6 +27,7 @@ import 
org.apache.inlong.manager.pojo.stream.InlongStreamBriefInfo;
 import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.pojo.stream.InlongStreamPageRequest;
 import org.apache.inlong.manager.pojo.stream.InlongStreamRequest;
+import org.apache.inlong.manager.pojo.stream.QueryMessageRequest;
 import org.apache.inlong.manager.pojo.stream.StreamField;
 import org.apache.inlong.manager.pojo.user.UserInfo;
 
@@ -241,12 +242,10 @@ public interface InlongStreamService {
     /**
      * List brief mq message info
      *
-     * @param groupId inlong group id
-     * @param streamId inlong stream id
-     * @param messageCount Count of messages to query'
+     * @param request query message request
      * @param operator operator
      * @return list of brief mq message info
      */
-    List<BriefMQMessage> listMessages(String groupId, String streamId, Integer 
messageCount, String operator);
+    List<BriefMQMessage> listMessages(QueryMessageRequest request, String 
operator);
 
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
index 46db20a04f..93692aeb28 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
@@ -55,6 +55,7 @@ import 
org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo;
 import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.pojo.stream.InlongStreamPageRequest;
 import org.apache.inlong.manager.pojo.stream.InlongStreamRequest;
+import org.apache.inlong.manager.pojo.stream.QueryMessageRequest;
 import org.apache.inlong.manager.pojo.stream.StreamField;
 import org.apache.inlong.manager.pojo.user.UserInfo;
 import org.apache.inlong.manager.pojo.user.UserRoleCode;
@@ -948,15 +949,15 @@ public class InlongStreamServiceImpl implements 
InlongStreamService {
     }
 
     @Override
-    public List<BriefMQMessage> listMessages(String groupId, String streamId, 
Integer messageCount, String operator) {
-        InlongGroupEntity groupEntity = groupMapper.selectByGroupId(groupId);
+    public List<BriefMQMessage> listMessages(QueryMessageRequest request, 
String operator) {
+        InlongGroupEntity groupEntity = 
groupMapper.selectByGroupId(request.getGroupId());
         InlongGroupOperator instance = 
groupOperatorFactory.getInstance(groupEntity.getMqType());
         InlongGroupInfo groupInfo = instance.getFromEntity(groupEntity);
-        InlongStreamInfo inlongStreamInfo = get(groupId, streamId);
+        InlongStreamInfo inlongStreamInfo = get(request.getGroupId(), 
request.getStreamId());
         List<BriefMQMessage> messageList = new ArrayList<>();
         QueueResourceOperator queueOperator = 
queueOperatorFactory.getInstance(groupEntity.getMqType());
         try {
-            messageList = queueOperator.queryLatestMessages(groupInfo, 
inlongStreamInfo, messageCount);
+            messageList = queueOperator.queryLatestMessages(groupInfo, 
inlongStreamInfo, request);
         } catch (Exception e) {
             LOGGER.error("query message error ", e);
         }
diff --git 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperatorTest.java
 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperatorTest.java
index acc6ebb3f5..d0b88d0ec7 100644
--- 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperatorTest.java
+++ 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperatorTest.java
@@ -22,6 +22,7 @@ import org.apache.inlong.common.enums.MessageWrapType;
 import org.apache.inlong.common.msg.InLongMsg;
 import org.apache.inlong.manager.pojo.consume.BriefMQMessage;
 import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.pojo.stream.QueryMessageRequest;
 import org.apache.inlong.manager.service.ServiceBaseTest;
 
 import com.google.common.collect.Lists;
@@ -116,15 +117,18 @@ public class KafkaOperatorTest extends ServiceBaseTest {
 
     @Test
     void testGetKafkaLatestMessage() {
-        List<BriefMQMessage> messages = 
kafkaOperator.getLatestMessage(consumer, TOPIC_NAME, 10, streamInfo);
+        QueryMessageRequest request = new QueryMessageRequest();
+        request.setMessageCount(10);
+        List<BriefMQMessage> messages = 
kafkaOperator.getLatestMessage(consumer, TOPIC_NAME, streamInfo, request);
         Assertions.assertEquals(0, messages.size());
     }
 
     @Test
     void testGetKafkaLatestMessage_1() {
         addRecord(Collections.singletonList("inlong"));
-
-        List<BriefMQMessage> messages = 
kafkaOperator.getLatestMessage(consumer, TOPIC_NAME, 10, streamInfo);
+        QueryMessageRequest request = new QueryMessageRequest();
+        request.setMessageCount(10);
+        List<BriefMQMessage> messages = 
kafkaOperator.getLatestMessage(consumer, TOPIC_NAME, streamInfo, request);
         Assertions.assertEquals(1, messages.size());
         Assertions.assertEquals("inlong", messages.get(0).getBody());
     }
@@ -133,8 +137,9 @@ public class KafkaOperatorTest extends ServiceBaseTest {
     void testGetKafkaLatestMessage_2() {
         List<String> records = IntStream.range(0, 9).mapToObj(index -> "name_" 
+ index).collect(Collectors.toList());
         addRecord(records);
-
-        List<BriefMQMessage> messages = 
kafkaOperator.getLatestMessage(consumer, TOPIC_NAME, 10, streamInfo);
+        QueryMessageRequest request = new QueryMessageRequest();
+        request.setMessageCount(10);
+        List<BriefMQMessage> messages = 
kafkaOperator.getLatestMessage(consumer, TOPIC_NAME, streamInfo, request);
         Assertions.assertEquals(9, messages.size());
     }
 
@@ -142,8 +147,9 @@ public class KafkaOperatorTest extends ServiceBaseTest {
     void testGetKafkaLatestMessage_4() {
         List<String> records = IntStream.range(0, 21).mapToObj(index -> 
"name_" + index).collect(Collectors.toList());
         addRecord(records);
-
-        List<BriefMQMessage> messages = 
kafkaOperator.getLatestMessage(consumer, TOPIC_NAME, 10, streamInfo);
+        QueryMessageRequest request = new QueryMessageRequest();
+        request.setMessageCount(10);
+        List<BriefMQMessage> messages = 
kafkaOperator.getLatestMessage(consumer, TOPIC_NAME, streamInfo, request);
         Assertions.assertEquals(10, messages.size());
     }
 
diff --git 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongStreamController.java
 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongStreamController.java
index ce2d9175c8..e5e0dd5343 100644
--- 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongStreamController.java
+++ 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongStreamController.java
@@ -33,6 +33,7 @@ import 
org.apache.inlong.manager.pojo.stream.InlongStreamBriefInfo;
 import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.pojo.stream.InlongStreamPageRequest;
 import org.apache.inlong.manager.pojo.stream.InlongStreamRequest;
+import org.apache.inlong.manager.pojo.stream.QueryMessageRequest;
 import org.apache.inlong.manager.pojo.stream.StreamField;
 import org.apache.inlong.manager.pojo.user.LoginUserUtils;
 import org.apache.inlong.manager.pojo.user.UserRoleCode;
@@ -255,15 +256,9 @@ public class InlongStreamController {
 
     @RequestMapping(value = "/stream/listMessages", method = RequestMethod.GET)
     @ApiOperation(value = "Get inlong stream message")
-    @ApiImplicitParams({
-            @ApiImplicitParam(name = "groupId", dataTypeClass = String.class, 
required = true),
-            @ApiImplicitParam(name = "streamId", dataTypeClass = String.class, 
required = true),
-            @ApiImplicitParam(name = "messageCount", dataTypeClass = 
String.class, required = true)
-    })
-    public Response<List<BriefMQMessage>> listMessages(@RequestParam String 
groupId, @RequestParam String streamId,
-            @RequestParam Integer messageCount) {
+    public Response<List<BriefMQMessage>> listMessages(QueryMessageRequest 
request) {
         String username = LoginUserUtils.getLoginUser().getName();
-        return Response.success(streamService.listMessages(groupId, streamId, 
messageCount, username));
+        return Response.success(streamService.listMessages(request, username));
     }
 
 }

Reply via email to