This is an automated email from the ASF dual-hosted git repository. jinrongtong pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new 3722431c25 [ISSUE #8458] Add more test coverage for ProcessQueue (#8459) 3722431c25 is described below commit 3722431c2593a5fc568d415d860001c690c5a5ad Author: yx9o <yangx_s...@163.com> AuthorDate: Sun Jul 28 17:11:15 2024 +0800 [ISSUE #8458] Add more test coverage for ProcessQueue (#8459) * [ISSUE #8458] Add more test coverage for ProcessQueue * Update * Update * Update --- .../client/impl/consumer/ProcessQueueTest.java | 82 ++++++++++++++++++++-- 1 file changed, 75 insertions(+), 7 deletions(-) diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ProcessQueueTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ProcessQueueTest.java index be0bd29f79..a8afd4a233 100644 --- a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ProcessQueueTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ProcessQueueTest.java @@ -16,17 +16,32 @@ */ package org.apache.rocketmq.client.impl.consumer; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; +import org.apache.commons.lang3.reflect.FieldUtils; +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.remoting.protocol.body.ProcessQueueInfo; import org.assertj.core.util.Lists; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.junit.MockitoJUnitRunner; +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.TreeMap; + import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; @RunWith(MockitoJUnitRunner.class) public class ProcessQueueTest { @@ -78,7 +93,7 @@ public class ProcessQueueTest { } @Test - public void testFillProcessQueueInfo() { + public void testFillProcessQueueInfo() throws IllegalAccessException { ProcessQueue pq = new ProcessQueue(); pq.putMessage(createMessageList(102400)); @@ -101,6 +116,57 @@ public class ProcessQueueTest { pq.commit(); pq.fillProcessQueueInfo(processQueueInfo); assertThat(processQueueInfo.getCachedMsgSizeInMiB()).isEqualTo(0); + + TreeMap<Long, MessageExt> consumingMsgOrderlyTreeMap = new TreeMap<>(); + consumingMsgOrderlyTreeMap.put(0L, createMessageList(1).get(0)); + FieldUtils.writeDeclaredField(pq, "consumingMsgOrderlyTreeMap", consumingMsgOrderlyTreeMap, true); + pq.fillProcessQueueInfo(processQueueInfo); + assertEquals(0, processQueueInfo.getTransactionMsgMinOffset()); + assertEquals(0, processQueueInfo.getTransactionMsgMaxOffset()); + assertEquals(1, processQueueInfo.getTransactionMsgCount()); + } + + @Test + public void testPopRequest() throws MQBrokerException, RemotingException, InterruptedException, MQClientException { + ProcessQueue processQueue = createProcessQueue(); + MessageExt messageExt = createMessageList(1).get(0); + messageExt.getProperties().put(MessageConst.PROPERTY_CONSUME_START_TIMESTAMP, System.currentTimeMillis() - 20 * 60 * 1000L + ""); + processQueue.getMsgTreeMap().put(0L, messageExt); + DefaultMQPushConsumer pushConsumer = mock(DefaultMQPushConsumer.class); + processQueue.cleanExpiredMsg(pushConsumer); + verify(pushConsumer).sendMessageBack(any(MessageExt.class), eq(3)); + } + + @Test + public void testRollback() throws IllegalAccessException { + ProcessQueue processQueue = createProcessQueue(); + processQueue.rollback(); + Field consumingMsgOrderlyTreeMapField = FieldUtils.getDeclaredField(processQueue.getClass(), "consumingMsgOrderlyTreeMap", true); + TreeMap<Long, MessageExt> consumingMsgOrderlyTreeMap = (TreeMap<Long, MessageExt>) consumingMsgOrderlyTreeMapField.get(processQueue); + assertEquals(0, consumingMsgOrderlyTreeMap.size()); + } + + @Test + public void testHasTempMessage() { + ProcessQueue processQueue = createProcessQueue(); + assertFalse(processQueue.hasTempMessage()); + } + + @Test + public void testProcessQueue() { + ProcessQueue processQueue1 = createProcessQueue(); + ProcessQueue processQueue2 = createProcessQueue(); + assertEquals(processQueue1.getMsgAccCnt(), processQueue2.getMsgAccCnt()); + assertEquals(processQueue1.getTryUnlockTimes(), processQueue2.getTryUnlockTimes()); + assertEquals(processQueue1.getLastLockTimestamp(), processQueue2.getLastLockTimestamp()); + assertEquals(processQueue1.getLastPullTimestamp(), processQueue2.getLastPullTimestamp()); + } + + private ProcessQueue createProcessQueue() { + ProcessQueue result = new ProcessQueue(); + result.setMsgAccCnt(1); + result.incTryUnlockTimes(); + return result; } private List<MessageExt> createMessageList() { @@ -108,13 +174,15 @@ public class ProcessQueueTest { } private List<MessageExt> createMessageList(int count) { - List<MessageExt> messageExtList = new ArrayList<>(); + List<MessageExt> result = new ArrayList<>(); for (int i = 0; i < count; i++) { MessageExt messageExt = new MessageExt(); messageExt.setQueueOffset(i); messageExt.setBody(new byte[123]); - messageExtList.add(messageExt); + messageExt.setKeys("keys" + i); + messageExt.getProperties().put(MessageConst.PROPERTY_CONSUME_START_TIMESTAMP, System.currentTimeMillis() + ""); + result.add(messageExt); } - return messageExtList; + return result; } }