This is an automated email from the ASF dual-hosted git repository. aloyszhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new d20a29cb14 [INLONG-10638][Manager] Data preview supports filtering function (#10639) d20a29cb14 is described below commit d20a29cb146400889958fd7b4088f92b5e4fbf8a Author: fuweng11 <76141879+fuwen...@users.noreply.github.com> AuthorDate: Wed Jul 17 19:11:08 2024 +0800 [INLONG-10638][Manager] Data preview supports filtering function (#10639) --- .../api/inner/client/InlongStreamClient.java | 17 +++++-- .../client/api/service/InlongStreamApi.java | 5 +- .../inlong/manager/common/util/JsonUtils.java | 20 ++++++++ .../manager/pojo/stream/QueryMessageRequest.java | 54 ++++++++++++++++++++++ .../service/message/DeserializeOperator.java | 34 +++++++++++++- .../message/InlongMsgDeserializeOperator.java | 10 ++-- .../service/message/PbMsgDeserializeOperator.java | 14 ++++-- .../service/message/RawMsgDeserializeOperator.java | 11 +++-- .../resource/queue/QueueResourceOperator.java | 5 +- .../resource/queue/kafka/KafkaOperator.java | 11 +++-- .../queue/kafka/KafkaQueueResourceOperator.java | 7 +-- .../resource/queue/pulsar/PulsarOperator.java | 15 +++--- .../queue/pulsar/PulsarQueueResourceOperator.java | 11 +++-- .../queue/pulsar/QueryLatestMessagesRunnable.java | 9 ++-- .../resource/queue/tubemq/TubeMQOperator.java | 8 ++-- .../queue/tubemq/TubeMQQueueResourceOperator.java | 5 +- .../service/stream/InlongStreamService.java | 7 ++- .../service/stream/InlongStreamServiceImpl.java | 9 ++-- .../resource/queue/kafka/KafkaOperatorTest.java | 20 +++++--- .../web/controller/InlongStreamController.java | 11 ++--- 20 files changed, 209 insertions(+), 74 deletions(-) diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongStreamClient.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongStreamClient.java index 89ca7eef68..e5573d2db1 100644 --- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongStreamClient.java +++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongStreamClient.java @@ -21,6 +21,7 @@ import org.apache.inlong.manager.client.api.ClientConfiguration; import org.apache.inlong.manager.client.api.service.InlongStreamApi; import org.apache.inlong.manager.client.api.util.ClientUtils; import org.apache.inlong.manager.common.enums.ErrorCodeEnum; +import org.apache.inlong.manager.common.util.JsonUtils; import org.apache.inlong.manager.common.util.Preconditions; import org.apache.inlong.manager.pojo.common.BatchResult; import org.apache.inlong.manager.pojo.common.PageResult; @@ -30,11 +31,15 @@ import org.apache.inlong.manager.pojo.sink.ParseFieldRequest; import org.apache.inlong.manager.pojo.stream.InlongStreamBriefInfo; import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; import org.apache.inlong.manager.pojo.stream.InlongStreamPageRequest; +import org.apache.inlong.manager.pojo.stream.QueryMessageRequest; import org.apache.inlong.manager.pojo.stream.StreamField; +import com.fasterxml.jackson.core.type.TypeReference; import org.apache.commons.lang3.tuple.Pair; import java.util.List; +import java.util.Map; +import java.util.Objects; import static org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_JSON; import static org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_SQL; @@ -295,11 +300,15 @@ public class InlongStreamClient { return parseFields(request); } - public List<BriefMQMessage> queryMessage(String groupId, String streamId, Integer messageCount) { - Preconditions.expectNotBlank(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY); - Preconditions.expectNotBlank(streamId, ErrorCodeEnum.STREAM_ID_IS_EMPTY); + public List<BriefMQMessage> queryMessage(QueryMessageRequest request) { + Preconditions.expectNotBlank(request.getGroupId(), ErrorCodeEnum.GROUP_ID_IS_EMPTY); + Preconditions.expectNotBlank(request.getStreamId(), ErrorCodeEnum.STREAM_ID_IS_EMPTY); + Map<String, Object> requestMap = JsonUtils.parseObject(request, + new TypeReference<Map<String, Object>>() { + }); + requestMap.entrySet().removeIf(entry -> Objects.isNull(entry.getValue())); Response<List<BriefMQMessage>> response = ClientUtils.executeHttpCall( - inlongStreamApi.listMessages(groupId, streamId, messageCount)); + inlongStreamApi.listMessages(requestMap)); ClientUtils.assertRespSuccess(response); return response.getData(); } diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongStreamApi.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongStreamApi.java index 6b47d8aeb2..ac31b09d20 100644 --- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongStreamApi.java +++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongStreamApi.java @@ -34,8 +34,10 @@ import retrofit2.http.GET; import retrofit2.http.POST; import retrofit2.http.Path; import retrofit2.http.Query; +import retrofit2.http.QueryMap; import java.util.List; +import java.util.Map; public interface InlongStreamApi { @@ -88,6 +90,5 @@ public interface InlongStreamApi { Call<Response<List<StreamField>>> parseFields(@Body ParseFieldRequest parseFieldRequest); @GET("stream/listMessages") - Call<Response<List<BriefMQMessage>>> listMessages(@Query("groupId") String groupId, - @Query("streamId") String streamId, @Query("messageCount") Integer messageCount); + Call<Response<List<BriefMQMessage>>> listMessages(@QueryMap Map<String, Object> query); } diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/JsonUtils.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/JsonUtils.java index de7f5647c1..8f21ac6364 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/JsonUtils.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/JsonUtils.java @@ -121,6 +121,26 @@ public class JsonUtils { } } + /** + * Parse Object to Java object. + * <p/> + * This method enhancements to {@link #parseObject(Object, TypeReference)}, + * as the above method can not solve this situation: + * + * @param object object + * @param typeReference The generic type is actually the parsed java type + * @return java object; + * @throws JsonException when parse error + */ + public static <T> T parseObject(Object object, TypeReference<T> typeReference) { + try { + return OBJECT_MAPPER.convertValue(object, typeReference); + } catch (Exception e) { + log.error("json parse err for: " + object, e); + throw new JsonException(e); + } + } + /** * Parse JSON string to Java object. * <p/> diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/QueryMessageRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/QueryMessageRequest.java new file mode 100644 index 0000000000..3c939222e5 --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/stream/QueryMessageRequest.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.stream; + +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * Query message request + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +@ApiModel("query message request") +public class QueryMessageRequest { + + @ApiModelProperty(value = "Inlong group id") + private String groupId; + + @ApiModelProperty(value = "Inlong stream id") + private String streamId; + + @ApiModelProperty(value = "Message count") + private Integer messageCount = 100; + + @ApiModelProperty(value = "Field name") + private String fieldName; + + @ApiModelProperty(value = "Operation type, like !=, =, like") + private String operationType; + + @ApiModelProperty(value = "TargetValue") + private String targetValue; +} diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/DeserializeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/DeserializeOperator.java index c62d3f6862..1fb898da99 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/DeserializeOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/DeserializeOperator.java @@ -21,10 +21,15 @@ import org.apache.inlong.common.enums.MessageWrapType; import org.apache.inlong.common.pojo.sort.dataflow.deserialization.DeserializationConfig; import org.apache.inlong.manager.common.exceptions.BusinessException; import org.apache.inlong.manager.pojo.consume.BriefMQMessage; +import org.apache.inlong.manager.pojo.consume.BriefMQMessage.FieldInfo; import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; +import org.apache.inlong.manager.pojo.stream.QueryMessageRequest; + +import org.apache.commons.lang3.StringUtils; import java.util.List; import java.util.Map; +import java.util.Objects; /** * Deserialize of message operator @@ -55,8 +60,8 @@ public interface DeserializeOperator { * @param index message index * @return list of brief mq message info */ - default List<BriefMQMessage> decodeMsg(InlongStreamInfo streamInfo, - byte[] msgBytes, Map<String, String> headers, int index) throws Exception { + default List<BriefMQMessage> decodeMsg(InlongStreamInfo streamInfo, List<BriefMQMessage> briefMQMessages, + byte[] msgBytes, Map<String, String> headers, int index, QueryMessageRequest request) throws Exception { return null; } @@ -64,4 +69,29 @@ public interface DeserializeOperator { throw new BusinessException(String.format("current type not support DeserializationInfo for wrapType=%s", streamInfo.getWrapType())); } + + default Boolean checkIfFilter(QueryMessageRequest request, List<FieldInfo> streamFieldList) { + if (StringUtils.isBlank(request.getFieldName()) || StringUtils.isBlank(request.getOperationType()) + || StringUtils.isBlank(request.getTargetValue())) { + return false; + } + boolean ifFilter = false; + FieldInfo fieldInfo = streamFieldList.stream() + .filter(v -> Objects.equals(v.getFieldName(), request.getFieldName())).findFirst() + .orElse(null); + if (fieldInfo != null) { + switch (request.getOperationType()) { + case "=": + ifFilter = !Objects.equals(request.getTargetValue(), fieldInfo.getFieldValue()); + break; + case "!=": + ifFilter = Objects.equals(request.getTargetValue(), fieldInfo.getFieldValue()); + break; + case "like": + ifFilter = fieldInfo.getFieldValue() != null + && !fieldInfo.getFieldValue().contains(request.getTargetValue()); + } + } + return ifFilter; + } } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/InlongMsgDeserializeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/InlongMsgDeserializeOperator.java index b04931db0f..10778bbc20 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/InlongMsgDeserializeOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/InlongMsgDeserializeOperator.java @@ -28,6 +28,7 @@ import org.apache.inlong.manager.common.exceptions.BusinessException; import org.apache.inlong.manager.pojo.consume.BriefMQMessage; import org.apache.inlong.manager.pojo.consume.BriefMQMessage.FieldInfo; import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; +import org.apache.inlong.manager.pojo.stream.QueryMessageRequest; import org.apache.inlong.manager.service.datatype.DataTypeOperator; import org.apache.inlong.manager.service.datatype.DataTypeOperatorFactory; @@ -36,7 +37,6 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.nio.charset.Charset; -import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -55,11 +55,10 @@ public class InlongMsgDeserializeOperator implements DeserializeOperator { } @Override - public List<BriefMQMessage> decodeMsg(InlongStreamInfo streamInfo, byte[] msgBytes, Map<String, String> headers, - int index) { + public List<BriefMQMessage> decodeMsg(InlongStreamInfo streamInfo, List<BriefMQMessage> messageList, + byte[] msgBytes, Map<String, String> headers, int index, QueryMessageRequest request) { String groupId = headers.get(AttributeConstants.GROUP_ID); String streamId = headers.get(AttributeConstants.STREAM_ID); - List<BriefMQMessage> messageList = new ArrayList<>(); InLongMsg inLongMsg = InLongMsg.parseFrom(msgBytes); for (String attr : inLongMsg.getAttrs()) { Map<String, String> attrMap = StringUtil.splitKv(attr, INLONGMSG_ATTR_ENTRY_DELIMITER, @@ -87,6 +86,9 @@ public class InlongMsgDeserializeOperator implements DeserializeOperator { DataTypeOperator dataTypeOperator = dataTypeOperatorFactory.getInstance(DataTypeEnum.forType(streamInfo.getDataType())); List<FieldInfo> streamFieldList = dataTypeOperator.parseFields(body, streamInfo); + if (checkIfFilter(request, streamFieldList)) { + continue; + } BriefMQMessage message = BriefMQMessage.builder() .id(index) .inlongGroupId(groupId) diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/PbMsgDeserializeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/PbMsgDeserializeOperator.java index 79760cf737..e8e32ab80a 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/PbMsgDeserializeOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/PbMsgDeserializeOperator.java @@ -27,6 +27,7 @@ import org.apache.inlong.manager.common.exceptions.BusinessException; import org.apache.inlong.manager.pojo.consume.BriefMQMessage; import org.apache.inlong.manager.pojo.consume.BriefMQMessage.FieldInfo; import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; +import org.apache.inlong.manager.pojo.stream.QueryMessageRequest; import org.apache.inlong.manager.service.datatype.DataTypeOperator; import org.apache.inlong.manager.service.datatype.DataTypeOperatorFactory; import org.apache.inlong.sdk.commons.protocol.ProxySdk.INLONG_COMPRESSED_TYPE; @@ -58,8 +59,8 @@ public class PbMsgDeserializeOperator implements DeserializeOperator { } @Override - public List<BriefMQMessage> decodeMsg(InlongStreamInfo streamInfo, - byte[] msgBytes, Map<String, String> headers, int index) throws Exception { + public List<BriefMQMessage> decodeMsg(InlongStreamInfo streamInfo, List<BriefMQMessage> briefMQMessages, + byte[] msgBytes, Map<String, String> headers, int index, QueryMessageRequest request) throws Exception { int compressType = Integer.parseInt(headers.getOrDefault(COMPRESS_TYPE_KEY, "0")); byte[] values = msgBytes; switch (compressType) { @@ -74,10 +75,12 @@ public class PbMsgDeserializeOperator implements DeserializeOperator { default: throw new IllegalArgumentException("Unknown compress type:" + compressType); } - return transformMessageObjs(MessageObjs.parseFrom(values), streamInfo, index); + briefMQMessages.addAll(transformMessageObjs(MessageObjs.parseFrom(values), streamInfo, index, request)); + return briefMQMessages; } - private List<BriefMQMessage> transformMessageObjs(MessageObjs messageObjs, InlongStreamInfo streamInfo, int index) { + private List<BriefMQMessage> transformMessageObjs(MessageObjs messageObjs, InlongStreamInfo streamInfo, int index, + QueryMessageRequest request) { if (null == messageObjs) { return null; } @@ -96,6 +99,9 @@ public class PbMsgDeserializeOperator implements DeserializeOperator { DataTypeOperator dataTypeOperator = dataTypeOperatorFactory.getInstance(DataTypeEnum.forType(streamInfo.getDataType())); List<FieldInfo> streamFieldList = dataTypeOperator.parseFields(body, streamInfo); + if (checkIfFilter(request, streamFieldList)) { + continue; + } BriefMQMessage message = BriefMQMessage.builder() .id(index) .inlongGroupId(headers.get(AttributeConstants.GROUP_ID)) diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/RawMsgDeserializeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/RawMsgDeserializeOperator.java index db025d2c2f..d55c7d6cad 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/RawMsgDeserializeOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/RawMsgDeserializeOperator.java @@ -24,6 +24,7 @@ import org.apache.inlong.manager.common.exceptions.BusinessException; import org.apache.inlong.manager.pojo.consume.BriefMQMessage; import org.apache.inlong.manager.pojo.consume.BriefMQMessage.FieldInfo; import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; +import org.apache.inlong.manager.pojo.stream.QueryMessageRequest; import org.apache.inlong.manager.service.datatype.DataTypeOperator; import org.apache.inlong.manager.service.datatype.DataTypeOperatorFactory; @@ -49,8 +50,8 @@ public class RawMsgDeserializeOperator implements DeserializeOperator { } @Override - public List<BriefMQMessage> decodeMsg(InlongStreamInfo streamInfo, - byte[] msgBytes, Map<String, String> headers, int index) { + public List<BriefMQMessage> decodeMsg(InlongStreamInfo streamInfo, List<BriefMQMessage> briefMQMessages, + byte[] msgBytes, Map<String, String> headers, int index, QueryMessageRequest request) { String groupId = headers.get(AttributeConstants.GROUP_ID); String streamId = headers.get(AttributeConstants.STREAM_ID); long msgTime = Long.parseLong(headers.getOrDefault(MSG_TIME_KEY, "0")); @@ -59,6 +60,9 @@ public class RawMsgDeserializeOperator implements DeserializeOperator { DataTypeOperator dataTypeOperator = dataTypeOperatorFactory.getInstance(DataTypeEnum.forType(streamInfo.getDataType())); List<FieldInfo> fieldList = dataTypeOperator.parseFields(body, streamInfo); + if (checkIfFilter(request, fieldList)) { + return briefMQMessages; + } BriefMQMessage briefMQMessage = BriefMQMessage.builder() .id(index) .inlongGroupId(groupId) @@ -69,7 +73,8 @@ public class RawMsgDeserializeOperator implements DeserializeOperator { .body(body) .fieldList(fieldList) .build(); - return Collections.singletonList(briefMQMessage); + briefMQMessages.addAll(Collections.singletonList(briefMQMessage)); + return briefMQMessages; } catch (Exception e) { String errMsg = String.format("decode msg failed for groupId=%s, streamId=%s", groupId, streamId); log.error(errMsg, e); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/QueueResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/QueueResourceOperator.java index 4664f670a6..528884c996 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/QueueResourceOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/QueueResourceOperator.java @@ -22,6 +22,7 @@ import org.apache.inlong.manager.dao.entity.StreamSinkEntity; import org.apache.inlong.manager.pojo.consume.BriefMQMessage; import org.apache.inlong.manager.pojo.group.InlongGroupInfo; import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; +import org.apache.inlong.manager.pojo.stream.QueryMessageRequest; import javax.validation.constraints.NotBlank; import javax.validation.constraints.NotNull; @@ -81,12 +82,12 @@ public interface QueueResourceOperator { * * @param groupInfo inlong group info * @param streamInfo inlong stream info - * @param messageCount count of messages to query + * @param request query message request * @throws Exception any exception if occurred * @return query brief mq message info */ default List<BriefMQMessage> queryLatestMessages(InlongGroupInfo groupInfo, InlongStreamInfo streamInfo, - Integer messageCount) throws Exception { + QueryMessageRequest request) throws Exception { return null; } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperator.java index 763a654c26..b49e52317a 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperator.java @@ -24,6 +24,7 @@ import org.apache.inlong.manager.pojo.cluster.kafka.KafkaClusterInfo; import org.apache.inlong.manager.pojo.consume.BriefMQMessage; import org.apache.inlong.manager.pojo.group.kafka.InlongKafkaInfo; import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; +import org.apache.inlong.manager.pojo.stream.QueryMessageRequest; import org.apache.inlong.manager.service.cluster.InlongClusterServiceImpl; import org.apache.inlong.manager.service.message.DeserializeOperator; import org.apache.inlong.manager.service.message.DeserializeOperatorFactory; @@ -113,19 +114,19 @@ public class KafkaOperator { * Query topic message for the given Kafka cluster. */ public List<BriefMQMessage> queryLatestMessage(KafkaClusterInfo clusterInfo, String topicName, - String consumeGroup, Integer messageCount, InlongStreamInfo streamInfo) { + String consumeGroup, InlongStreamInfo streamInfo, QueryMessageRequest request) { LOGGER.debug("begin to query message for topic {} in cluster: {}", topicName, clusterInfo); Properties properties = getProperties(clusterInfo.getUrl(), consumeGroup); KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(properties); - return getLatestMessage(consumer, topicName, messageCount, streamInfo); + return getLatestMessage(consumer, topicName, streamInfo, request); } @VisibleForTesting public List<BriefMQMessage> getLatestMessage(Consumer<byte[], byte[]> consumer, String topicName, - Integer messageCount, InlongStreamInfo streamInfo) { + InlongStreamInfo streamInfo, QueryMessageRequest request) { List<BriefMQMessage> messageList = new ArrayList<>(); - + Integer messageCount = request.getMessageCount(); try { List<PartitionInfo> partitionInfoList = consumer.partitionsFor(topicName); List<TopicPartition> topicPartitionList = partitionInfoList.stream() @@ -162,7 +163,7 @@ public class KafkaOperator { MessageWrapType.valueOf(Integer.parseInt(headers.get(InlongConstants.MSG_ENCODE_VER))); } DeserializeOperator deserializeOperator = deserializeOperatorFactory.getInstance(messageWrapType); - messageList.addAll(deserializeOperator.decodeMsg(streamInfo, record.value(), headers, index)); + deserializeOperator.decodeMsg(streamInfo, messageList, record.value(), headers, index, request); if (messageList.size() >= messageCount) { break; } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaQueueResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaQueueResourceOperator.java index cc0fa83dd2..6708bd8ecd 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaQueueResourceOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaQueueResourceOperator.java @@ -32,6 +32,7 @@ import org.apache.inlong.manager.pojo.group.InlongGroupInfo; import org.apache.inlong.manager.pojo.group.kafka.InlongKafkaInfo; import org.apache.inlong.manager.pojo.stream.InlongStreamBriefInfo; import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; +import org.apache.inlong.manager.pojo.stream.QueryMessageRequest; import org.apache.inlong.manager.service.cluster.InlongClusterService; import org.apache.inlong.manager.service.consume.InlongConsumeService; import org.apache.inlong.manager.service.resource.queue.QueueResourceOperator; @@ -192,7 +193,7 @@ public class KafkaQueueResourceOperator implements QueueResourceOperator { @Override public List<BriefMQMessage> queryLatestMessages(InlongGroupInfo groupInfo, InlongStreamInfo streamInfo, - Integer messageCount) { + QueryMessageRequest request) { ClusterInfo clusterInfo = clusterService.getOne(groupInfo.getInlongClusterTag(), null, ClusterType.KAFKA); String topicName = streamInfo.getMqResource(); @@ -204,8 +205,8 @@ public class KafkaQueueResourceOperator implements QueueResourceOperator { String consumeGroup = String.format(KAFKA_CONSUMER_GROUP_REALTIME_REVIEW, groupInfo.getInlongClusterTag(), topicName); - return kafkaOperator.queryLatestMessage((KafkaClusterInfo) clusterInfo, topicName, consumeGroup, messageCount, - streamInfo); + return kafkaOperator.queryLatestMessage((KafkaClusterInfo) clusterInfo, topicName, consumeGroup, streamInfo, + request); } @Override diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java index 5c9e9f5d10..e5e8d4c815 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java @@ -34,6 +34,7 @@ import org.apache.inlong.manager.pojo.queue.pulsar.PulsarTenantInfo; import org.apache.inlong.manager.pojo.queue.pulsar.PulsarTopicInfo; import org.apache.inlong.manager.pojo.queue.pulsar.PulsarTopicMetadata; import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; +import org.apache.inlong.manager.pojo.stream.QueryMessageRequest; import org.apache.inlong.manager.service.cluster.InlongClusterServiceImpl; import org.apache.inlong.manager.service.message.DeserializeOperator; import org.apache.inlong.manager.service.message.DeserializeOperatorFactory; @@ -409,16 +410,16 @@ public class PulsarOperator { * Query topic message for the given pulsar cluster. */ public List<BriefMQMessage> queryLatestMessage(PulsarClusterInfo pulsarClusterInfo, String topicFullName, - Integer messageCount, InlongStreamInfo streamInfo, boolean serial) { + QueryMessageRequest request, InlongStreamInfo streamInfo, boolean serial) { LOGGER.info("begin to query message for topic {}", topicFullName); List<BriefMQMessage> messageList = new ArrayList<>(); int partitionCount = getPartitionCount(pulsarClusterInfo, topicFullName); - for (int messageIndex = 0; messageIndex < messageCount; messageIndex++) { + for (int messageIndex = 0; messageIndex < 100; messageIndex++) { int currentPartitionNum = messageIndex % partitionCount; int messagePosition = messageIndex / partitionCount + 1; String topicNameOfPartition = buildTopicNameOfPartition(topicFullName, currentPartitionNum, serial); - messageList.addAll(queryMessageFromPulsar(topicNameOfPartition, pulsarClusterInfo, messageIndex, - streamInfo, messagePosition)); + messageList.addAll(queryMessageFromPulsar(topicNameOfPartition, pulsarClusterInfo, messageIndex, streamInfo, + messagePosition, request)); } LOGGER.info("success query message for topic={}", topicFullName); return messageList; @@ -444,8 +445,7 @@ public class PulsarOperator { * Query pulsar message. */ private List<BriefMQMessage> queryMessageFromPulsar(String topicPartition, PulsarClusterInfo pulsarClusterInfo, - int index, - InlongStreamInfo streamInfo, int messagePosition) { + int index, InlongStreamInfo streamInfo, int messagePosition, QueryMessageRequest request) { List<BriefMQMessage> briefMQMessages = new ArrayList<>(); try { ResponseEntity<byte[]> httpResponse = @@ -462,8 +462,7 @@ public class PulsarOperator { MessageWrapType.valueOf(Integer.parseInt(headers.get(InlongConstants.MSG_ENCODE_VER))); } DeserializeOperator deserializeOperator = deserializeOperatorFactory.getInstance(messageWrapType); - briefMQMessages.addAll(deserializeOperator.decodeMsg(streamInfo, messageInfo.getBody(), - headers, index)); + deserializeOperator.decodeMsg(streamInfo, briefMQMessages, messageInfo.getBody(), headers, index, request); } catch (Exception e) { LOGGER.warn("query message from pulsar error for groupId = {}, streamId = {}", streamInfo.getInlongGroupId(), diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarQueueResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarQueueResourceOperator.java index 6d86760ea4..efbdeabf20 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarQueueResourceOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarQueueResourceOperator.java @@ -36,6 +36,7 @@ import org.apache.inlong.manager.pojo.queue.pulsar.PulsarTopicInfo; import org.apache.inlong.manager.pojo.sink.StreamSink; import org.apache.inlong.manager.pojo.stream.InlongStreamBriefInfo; import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; +import org.apache.inlong.manager.pojo.stream.QueryMessageRequest; import org.apache.inlong.manager.service.cluster.InlongClusterService; import org.apache.inlong.manager.service.consume.InlongConsumeService; import org.apache.inlong.manager.service.resource.queue.QueueResourceOperator; @@ -305,23 +306,23 @@ public class PulsarQueueResourceOperator implements QueueResourceOperator { /** * Query latest message from pulsar */ - public List<BriefMQMessage> queryLatestMessages(InlongGroupInfo groupInfo, - InlongStreamInfo streamInfo, Integer messageCount) throws Exception { + public List<BriefMQMessage> queryLatestMessages(InlongGroupInfo groupInfo, InlongStreamInfo streamInfo, + QueryMessageRequest request) throws Exception { List<ClusterInfo> pulsarClusterList = clusterService.listByTagAndType(groupInfo.getInlongClusterTag(), ClusterType.PULSAR); List<BriefMQMessage> briefMQMessages = Collections.synchronizedList(new ArrayList<>()); - QueryCountDownLatch queryLatch = new QueryCountDownLatch(messageCount, pulsarClusterList.size()); + QueryCountDownLatch queryLatch = new QueryCountDownLatch(request.getMessageCount(), pulsarClusterList.size()); InlongPulsarInfo inlongPulsarInfo = ((InlongPulsarInfo) groupInfo); for (ClusterInfo clusterInfo : pulsarClusterList) { QueryLatestMessagesRunnable task = new QueryLatestMessagesRunnable(inlongPulsarInfo, streamInfo, - (PulsarClusterInfo) clusterInfo, pulsarOperator, messageCount, briefMQMessages, queryLatch); + (PulsarClusterInfo) clusterInfo, pulsarOperator, request, briefMQMessages, queryLatch); this.executor.execute(task); } queryLatch.await(30, TimeUnit.SECONDS); log.info("success query pulsar message for groupId={}, streamId={}", streamInfo.getInlongGroupId(), streamInfo.getInlongStreamId()); - int finalMsgCount = Math.min(messageCount, briefMQMessages.size()); + int finalMsgCount = Math.min(request.getMessageCount(), briefMQMessages.size()); if (finalMsgCount > 0) { return briefMQMessages.subList(0, finalMsgCount); } else { diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/QueryLatestMessagesRunnable.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/QueryLatestMessagesRunnable.java index caf61e0ebe..4fb6b58e49 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/QueryLatestMessagesRunnable.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/QueryLatestMessagesRunnable.java @@ -22,6 +22,7 @@ import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo; import org.apache.inlong.manager.pojo.consume.BriefMQMessage; import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarInfo; import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; +import org.apache.inlong.manager.pojo.stream.QueryMessageRequest; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; @@ -37,7 +38,7 @@ public class QueryLatestMessagesRunnable implements Runnable { private InlongStreamInfo streamInfo; private PulsarClusterInfo clusterInfo; private PulsarOperator pulsarOperator; - private Integer messageCount; + private QueryMessageRequest queryMessageRequest; private List<BriefMQMessage> briefMQMessages; private QueryCountDownLatch latch; @@ -45,14 +46,14 @@ public class QueryLatestMessagesRunnable implements Runnable { InlongStreamInfo streamInfo, PulsarClusterInfo clusterInfo, PulsarOperator pulsarOperator, - Integer messageCount, + QueryMessageRequest queryMessageRequest, List<BriefMQMessage> briefMQMessages, QueryCountDownLatch latch) { this.inlongPulsarInfo = inlongPulsarInfo; this.streamInfo = streamInfo; this.clusterInfo = clusterInfo; this.pulsarOperator = pulsarOperator; - this.messageCount = messageCount; + this.queryMessageRequest = queryMessageRequest; this.briefMQMessages = briefMQMessages; this.latch = latch; } @@ -69,7 +70,7 @@ public class QueryLatestMessagesRunnable implements Runnable { String fullTopicName = tenant + "/" + namespace + "/" + topicName; boolean serial = InlongConstants.PULSAR_QUEUE_TYPE_SERIAL.equals(inlongPulsarInfo.getQueueModule()); List<BriefMQMessage> messages = - pulsarOperator.queryLatestMessage(clusterInfo, fullTopicName, messageCount, streamInfo, serial); + pulsarOperator.queryLatestMessage(clusterInfo, fullTopicName, queryMessageRequest, streamInfo, serial); if (CollectionUtils.isNotEmpty(messages)) { briefMQMessages.addAll(messages); this.latch.countDown(messages.size()); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/tubemq/TubeMQOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/tubemq/TubeMQOperator.java index cdbc4da86d..050de07805 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/tubemq/TubeMQOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/tubemq/TubeMQOperator.java @@ -30,6 +30,7 @@ import org.apache.inlong.manager.pojo.queue.tubemq.TubeHttpResponse; import org.apache.inlong.manager.pojo.queue.tubemq.TubeMessageResponse; import org.apache.inlong.manager.pojo.queue.tubemq.TubeMessageResponse.TubeDataInfo; import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; +import org.apache.inlong.manager.pojo.stream.QueryMessageRequest; import org.apache.inlong.manager.service.cluster.InlongClusterServiceImpl; import org.apache.inlong.manager.service.message.DeserializeOperator; import org.apache.inlong.manager.service.message.DeserializeOperatorFactory; @@ -269,7 +270,7 @@ public class TubeMQOperator { * Query topic message for the given tubemq cluster. */ public List<BriefMQMessage> queryLastMessage(TubeClusterInfo tubeCluster, String topicName, - Integer msgCount, InlongStreamInfo streamInfo) { + InlongStreamInfo streamInfo, QueryMessageRequest request) { LOGGER.info("begin to query message for topic {} in cluster: {}", topicName, tubeCluster); String masterUrl = tubeCluster.getMasterWebUrl(); TubeBrokerInfo brokerView = this.getBrokerInfo(masterUrl); @@ -286,7 +287,8 @@ public class TubeMQOperator { throw new BusinessException("TubeMQ master url or TubeMQ topic cannot be null"); } - String url = "http://" + brokerUrl + QUERY_MESSAGE_PATH + TOPIC_NAME + topicName + MSG_COUNT + msgCount; + String url = "http://" + brokerUrl + QUERY_MESSAGE_PATH + TOPIC_NAME + topicName + MSG_COUNT + + request.getMessageCount(); TubeMessageResponse response = HttpUtils.request(restTemplate, url, HttpMethod.GET, null, new HttpHeaders(), TubeMessageResponse.class); if (response.getErrCode() != SUCCESS_CODE && response.getErrCode() != 200) { @@ -310,7 +312,7 @@ public class TubeMQOperator { } byte[] messageData = Base64.getDecoder().decode(tubeDataInfo.getData()); DeserializeOperator deserializeOperator = deserializeOperatorFactory.getInstance(messageWrapType); - messageList.addAll(deserializeOperator.decodeMsg(streamInfo, messageData, map, index)); + deserializeOperator.decodeMsg(streamInfo, messageList, messageData, map, index, request); } LOGGER.info("success query messages for topic={}", topicName); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/tubemq/TubeMQQueueResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/tubemq/TubeMQQueueResourceOperator.java index f6cf034e67..14dbd4f202 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/tubemq/TubeMQQueueResourceOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/tubemq/TubeMQQueueResourceOperator.java @@ -30,6 +30,7 @@ import org.apache.inlong.manager.pojo.consume.BriefMQMessage; import org.apache.inlong.manager.pojo.group.InlongGroupInfo; import org.apache.inlong.manager.pojo.sink.StreamSink; import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; +import org.apache.inlong.manager.pojo.stream.QueryMessageRequest; import org.apache.inlong.manager.service.cluster.InlongClusterService; import org.apache.inlong.manager.service.consume.InlongConsumeService; import org.apache.inlong.manager.service.resource.queue.QueueResourceOperator; @@ -132,14 +133,14 @@ public class TubeMQQueueResourceOperator implements QueueResourceOperator { } public List<BriefMQMessage> queryLatestMessages(InlongGroupInfo groupInfo, InlongStreamInfo streamInfo, - Integer messageCount) { + QueryMessageRequest request) { Preconditions.expectNotNull(groupInfo, "inlong group info cannot be null"); String clusterTag = groupInfo.getInlongClusterTag(); TubeClusterInfo tubeCluster = (TubeClusterInfo) clusterService.getOne(clusterTag, null, ClusterType.TUBEMQ); String topicName = groupInfo.getMqResource(); - return tubeMQOperator.queryLastMessage(tubeCluster, topicName, messageCount, streamInfo); + return tubeMQOperator.queryLastMessage(tubeCluster, topicName, streamInfo, request); } @Override diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamService.java index 96002d0ebd..013a6546b5 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamService.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamService.java @@ -27,6 +27,7 @@ import org.apache.inlong.manager.pojo.stream.InlongStreamBriefInfo; import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; import org.apache.inlong.manager.pojo.stream.InlongStreamPageRequest; import org.apache.inlong.manager.pojo.stream.InlongStreamRequest; +import org.apache.inlong.manager.pojo.stream.QueryMessageRequest; import org.apache.inlong.manager.pojo.stream.StreamField; import org.apache.inlong.manager.pojo.user.UserInfo; @@ -241,12 +242,10 @@ public interface InlongStreamService { /** * List brief mq message info * - * @param groupId inlong group id - * @param streamId inlong stream id - * @param messageCount Count of messages to query' + * @param request query message request * @param operator operator * @return list of brief mq message info */ - List<BriefMQMessage> listMessages(String groupId, String streamId, Integer messageCount, String operator); + List<BriefMQMessage> listMessages(QueryMessageRequest request, String operator); } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java index 46db20a04f..93692aeb28 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java @@ -55,6 +55,7 @@ import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo; import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; import org.apache.inlong.manager.pojo.stream.InlongStreamPageRequest; import org.apache.inlong.manager.pojo.stream.InlongStreamRequest; +import org.apache.inlong.manager.pojo.stream.QueryMessageRequest; import org.apache.inlong.manager.pojo.stream.StreamField; import org.apache.inlong.manager.pojo.user.UserInfo; import org.apache.inlong.manager.pojo.user.UserRoleCode; @@ -948,15 +949,15 @@ public class InlongStreamServiceImpl implements InlongStreamService { } @Override - public List<BriefMQMessage> listMessages(String groupId, String streamId, Integer messageCount, String operator) { - InlongGroupEntity groupEntity = groupMapper.selectByGroupId(groupId); + public List<BriefMQMessage> listMessages(QueryMessageRequest request, String operator) { + InlongGroupEntity groupEntity = groupMapper.selectByGroupId(request.getGroupId()); InlongGroupOperator instance = groupOperatorFactory.getInstance(groupEntity.getMqType()); InlongGroupInfo groupInfo = instance.getFromEntity(groupEntity); - InlongStreamInfo inlongStreamInfo = get(groupId, streamId); + InlongStreamInfo inlongStreamInfo = get(request.getGroupId(), request.getStreamId()); List<BriefMQMessage> messageList = new ArrayList<>(); QueueResourceOperator queueOperator = queueOperatorFactory.getInstance(groupEntity.getMqType()); try { - messageList = queueOperator.queryLatestMessages(groupInfo, inlongStreamInfo, messageCount); + messageList = queueOperator.queryLatestMessages(groupInfo, inlongStreamInfo, request); } catch (Exception e) { LOGGER.error("query message error ", e); } diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperatorTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperatorTest.java index acc6ebb3f5..d0b88d0ec7 100644 --- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperatorTest.java +++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperatorTest.java @@ -22,6 +22,7 @@ import org.apache.inlong.common.enums.MessageWrapType; import org.apache.inlong.common.msg.InLongMsg; import org.apache.inlong.manager.pojo.consume.BriefMQMessage; import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; +import org.apache.inlong.manager.pojo.stream.QueryMessageRequest; import org.apache.inlong.manager.service.ServiceBaseTest; import com.google.common.collect.Lists; @@ -116,15 +117,18 @@ public class KafkaOperatorTest extends ServiceBaseTest { @Test void testGetKafkaLatestMessage() { - List<BriefMQMessage> messages = kafkaOperator.getLatestMessage(consumer, TOPIC_NAME, 10, streamInfo); + QueryMessageRequest request = new QueryMessageRequest(); + request.setMessageCount(10); + List<BriefMQMessage> messages = kafkaOperator.getLatestMessage(consumer, TOPIC_NAME, streamInfo, request); Assertions.assertEquals(0, messages.size()); } @Test void testGetKafkaLatestMessage_1() { addRecord(Collections.singletonList("inlong")); - - List<BriefMQMessage> messages = kafkaOperator.getLatestMessage(consumer, TOPIC_NAME, 10, streamInfo); + QueryMessageRequest request = new QueryMessageRequest(); + request.setMessageCount(10); + List<BriefMQMessage> messages = kafkaOperator.getLatestMessage(consumer, TOPIC_NAME, streamInfo, request); Assertions.assertEquals(1, messages.size()); Assertions.assertEquals("inlong", messages.get(0).getBody()); } @@ -133,8 +137,9 @@ public class KafkaOperatorTest extends ServiceBaseTest { void testGetKafkaLatestMessage_2() { List<String> records = IntStream.range(0, 9).mapToObj(index -> "name_" + index).collect(Collectors.toList()); addRecord(records); - - List<BriefMQMessage> messages = kafkaOperator.getLatestMessage(consumer, TOPIC_NAME, 10, streamInfo); + QueryMessageRequest request = new QueryMessageRequest(); + request.setMessageCount(10); + List<BriefMQMessage> messages = kafkaOperator.getLatestMessage(consumer, TOPIC_NAME, streamInfo, request); Assertions.assertEquals(9, messages.size()); } @@ -142,8 +147,9 @@ public class KafkaOperatorTest extends ServiceBaseTest { void testGetKafkaLatestMessage_4() { List<String> records = IntStream.range(0, 21).mapToObj(index -> "name_" + index).collect(Collectors.toList()); addRecord(records); - - List<BriefMQMessage> messages = kafkaOperator.getLatestMessage(consumer, TOPIC_NAME, 10, streamInfo); + QueryMessageRequest request = new QueryMessageRequest(); + request.setMessageCount(10); + List<BriefMQMessage> messages = kafkaOperator.getLatestMessage(consumer, TOPIC_NAME, streamInfo, request); Assertions.assertEquals(10, messages.size()); } diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongStreamController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongStreamController.java index ce2d9175c8..e5e0dd5343 100644 --- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongStreamController.java +++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongStreamController.java @@ -33,6 +33,7 @@ import org.apache.inlong.manager.pojo.stream.InlongStreamBriefInfo; import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; import org.apache.inlong.manager.pojo.stream.InlongStreamPageRequest; import org.apache.inlong.manager.pojo.stream.InlongStreamRequest; +import org.apache.inlong.manager.pojo.stream.QueryMessageRequest; import org.apache.inlong.manager.pojo.stream.StreamField; import org.apache.inlong.manager.pojo.user.LoginUserUtils; import org.apache.inlong.manager.pojo.user.UserRoleCode; @@ -255,15 +256,9 @@ public class InlongStreamController { @RequestMapping(value = "/stream/listMessages", method = RequestMethod.GET) @ApiOperation(value = "Get inlong stream message") - @ApiImplicitParams({ - @ApiImplicitParam(name = "groupId", dataTypeClass = String.class, required = true), - @ApiImplicitParam(name = "streamId", dataTypeClass = String.class, required = true), - @ApiImplicitParam(name = "messageCount", dataTypeClass = String.class, required = true) - }) - public Response<List<BriefMQMessage>> listMessages(@RequestParam String groupId, @RequestParam String streamId, - @RequestParam Integer messageCount) { + public Response<List<BriefMQMessage>> listMessages(QueryMessageRequest request) { String username = LoginUserUtils.getLoginUser().getName(); - return Response.success(streamService.listMessages(groupId, streamId, messageCount, username)); + return Response.success(streamService.listMessages(request, username)); } }