This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new cf4234b288 [ISSUE #8392] add tests for QueryMessageProcessor
cf4234b288 is described below

commit cf4234b288ff3ee64446e1d776052fcb0c87510a
Author: Tan Xiang <82364837+tanxiang...@users.noreply.github.com>
AuthorDate: Thu Jul 18 11:26:33 2024 +0800

    [ISSUE #8392] add tests for QueryMessageProcessor
---
 .../processor/QueryMessageProcessorTest.java       | 131 +++++++++++++++++++++
 1 file changed, 131 insertions(+)

diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/QueryMessageProcessorTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/QueryMessageProcessorTest.java
new file mode 100644
index 0000000000..0fd54df7d8
--- /dev/null
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/QueryMessageProcessorTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.processor;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandlerContext;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.protocol.RequestCode;
+import org.apache.rocketmq.remoting.protocol.ResponseCode;
+import org.apache.rocketmq.remoting.protocol.header.QueryMessageRequestHeader;
+import org.apache.rocketmq.remoting.protocol.header.ViewMessageRequestHeader;
+import org.apache.rocketmq.store.MessageStore;
+import org.apache.rocketmq.store.QueryMessageResult;
+import org.apache.rocketmq.store.SelectMappedBufferResult;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Spy;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.util.HashMap;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class QueryMessageProcessorTest {
+    private QueryMessageProcessor queryMessageProcessor;
+    @Spy
+    private BrokerController brokerController = new BrokerController(new 
BrokerConfig(), new NettyServerConfig(), new NettyClientConfig(), new 
MessageStoreConfig());
+
+    @Mock
+    private MessageStore messageStore;
+
+    @Mock
+    private ChannelHandlerContext handlerContext;
+
+    @Mock
+    private Channel channel;
+
+    @Mock
+    private ChannelFuture channelFuture;
+
+    @Before
+    public void init() {
+        when(handlerContext.channel()).thenReturn(channel);
+        queryMessageProcessor = new QueryMessageProcessor(brokerController);
+        when(brokerController.getMessageStore()).thenReturn(messageStore);
+        when(channel.writeAndFlush(any())).thenReturn(channelFuture);
+    }
+
+    @Test
+    public void testQueryMessage() throws RemotingCommandException {
+        QueryMessageResult result = new QueryMessageResult();
+        result.setIndexLastUpdateTimestamp(100);
+        result.setIndexLastUpdatePhyoffset(0);
+        result.addMessage(new SelectMappedBufferResult(0, null, 0, null));
+
+        
when(messageStore.queryMessage(anyString(),anyString(),anyInt(),anyLong(),anyLong())).thenReturn(result);
+        RemotingCommand request = createQueryMessageRequest("topic", "msgKey", 
1, 100, 200,"false");
+        request.makeCustomHeaderToNet();
+        RemotingCommand response = 
queryMessageProcessor.processRequest(handlerContext, request);
+        Assert.assertEquals(response.getCode(), ResponseCode.QUERY_NOT_FOUND);
+
+        result.addMessage(new SelectMappedBufferResult(0, null, 1, null));
+        
when(messageStore.queryMessage(anyString(),anyString(),anyInt(),anyLong(),anyLong())).thenReturn(result);
+        response = queryMessageProcessor.processRequest(handlerContext, 
request);
+        Assert.assertNull(response);
+    }
+
+    @Test
+    public void testViewMessageById() throws RemotingCommandException {
+        ViewMessageRequestHeader viewMessageRequestHeader = new 
ViewMessageRequestHeader();
+        viewMessageRequestHeader.setTopic("topic");
+        viewMessageRequestHeader.setOffset(0L);
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.VIEW_MESSAGE_BY_ID, 
viewMessageRequestHeader);
+        request.makeCustomHeaderToNet();
+        request.setCode(RequestCode.VIEW_MESSAGE_BY_ID);
+
+        
when(messageStore.selectOneMessageByOffset(anyLong())).thenReturn(null);
+        RemotingCommand response = 
queryMessageProcessor.processRequest(handlerContext, request);
+        Assert.assertEquals(response.getCode(), ResponseCode.SYSTEM_ERROR);
+
+        when(messageStore.selectOneMessageByOffset(anyLong())).thenReturn(new 
SelectMappedBufferResult(0, null, 0, null));
+        response = queryMessageProcessor.processRequest(handlerContext, 
request);
+        Assert.assertNull(response);
+    }
+
+    private RemotingCommand createQueryMessageRequest(String topic, String 
key, int maxNum, long beginTimestamp, long endTimestamp,String flag) {
+        QueryMessageRequestHeader requestHeader = new 
QueryMessageRequestHeader();
+        requestHeader.setTopic(topic);
+        requestHeader.setKey(key);
+        requestHeader.setMaxNum(maxNum);
+        requestHeader.setBeginTimestamp(beginTimestamp);
+        requestHeader.setEndTimestamp(endTimestamp);
+
+        HashMap<String, String> extFields = new HashMap<>();
+        extFields.put(MixAll.UNIQUE_MSG_QUERY_FLAG, flag);
+
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.QUERY_MESSAGE, requestHeader);
+        request.setExtFields(extFields);
+        return request;
+    }
+}

Reply via email to