This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 3722431c25 [ISSUE #8458] Add more test coverage for ProcessQueue 
(#8459)
3722431c25 is described below

commit 3722431c2593a5fc568d415d860001c690c5a5ad
Author: yx9o <yangx_s...@163.com>
AuthorDate: Sun Jul 28 17:11:15 2024 +0800

    [ISSUE #8458] Add more test coverage for ProcessQueue (#8459)
    
    * [ISSUE #8458] Add more test coverage for ProcessQueue
    
    * Update
    
    * Update
    
    * Update
---
 .../client/impl/consumer/ProcessQueueTest.java     | 82 ++++++++++++++++++++--
 1 file changed, 75 insertions(+), 7 deletions(-)

diff --git 
a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ProcessQueueTest.java
 
b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ProcessQueueTest.java
index be0bd29f79..a8afd4a233 100644
--- 
a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ProcessQueueTest.java
+++ 
b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ProcessQueueTest.java
@@ -16,17 +16,32 @@
  */
 package org.apache.rocketmq.client.impl.consumer;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.apache.rocketmq.remoting.protocol.body.ProcessQueueInfo;
 import org.assertj.core.util.Lists;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.junit.MockitoJUnitRunner;
 
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.TreeMap;
+
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
 
 @RunWith(MockitoJUnitRunner.class)
 public class ProcessQueueTest {
@@ -78,7 +93,7 @@ public class ProcessQueueTest {
     }
 
     @Test
-    public void testFillProcessQueueInfo() {
+    public void testFillProcessQueueInfo() throws IllegalAccessException {
         ProcessQueue pq = new ProcessQueue();
         pq.putMessage(createMessageList(102400));
 
@@ -101,6 +116,57 @@ public class ProcessQueueTest {
         pq.commit();
         pq.fillProcessQueueInfo(processQueueInfo);
         assertThat(processQueueInfo.getCachedMsgSizeInMiB()).isEqualTo(0);
+
+        TreeMap<Long, MessageExt> consumingMsgOrderlyTreeMap = new TreeMap<>();
+        consumingMsgOrderlyTreeMap.put(0L, createMessageList(1).get(0));
+        FieldUtils.writeDeclaredField(pq, "consumingMsgOrderlyTreeMap", 
consumingMsgOrderlyTreeMap, true);
+        pq.fillProcessQueueInfo(processQueueInfo);
+        assertEquals(0, processQueueInfo.getTransactionMsgMinOffset());
+        assertEquals(0, processQueueInfo.getTransactionMsgMaxOffset());
+        assertEquals(1, processQueueInfo.getTransactionMsgCount());
+    }
+
+    @Test
+    public void testPopRequest() throws MQBrokerException, RemotingException, 
InterruptedException, MQClientException {
+        ProcessQueue processQueue = createProcessQueue();
+        MessageExt messageExt = createMessageList(1).get(0);
+        
messageExt.getProperties().put(MessageConst.PROPERTY_CONSUME_START_TIMESTAMP, 
System.currentTimeMillis() - 20 * 60 * 1000L + "");
+        processQueue.getMsgTreeMap().put(0L, messageExt);
+        DefaultMQPushConsumer pushConsumer = mock(DefaultMQPushConsumer.class);
+        processQueue.cleanExpiredMsg(pushConsumer);
+        verify(pushConsumer).sendMessageBack(any(MessageExt.class), eq(3));
+    }
+
+    @Test
+    public void testRollback() throws IllegalAccessException {
+        ProcessQueue processQueue = createProcessQueue();
+        processQueue.rollback();
+        Field consumingMsgOrderlyTreeMapField = 
FieldUtils.getDeclaredField(processQueue.getClass(), 
"consumingMsgOrderlyTreeMap", true);
+        TreeMap<Long, MessageExt> consumingMsgOrderlyTreeMap = (TreeMap<Long, 
MessageExt>) consumingMsgOrderlyTreeMapField.get(processQueue);
+        assertEquals(0, consumingMsgOrderlyTreeMap.size());
+    }
+
+    @Test
+    public void testHasTempMessage() {
+        ProcessQueue processQueue = createProcessQueue();
+        assertFalse(processQueue.hasTempMessage());
+    }
+
+    @Test
+    public void testProcessQueue() {
+        ProcessQueue processQueue1 = createProcessQueue();
+        ProcessQueue processQueue2 = createProcessQueue();
+        assertEquals(processQueue1.getMsgAccCnt(), 
processQueue2.getMsgAccCnt());
+        assertEquals(processQueue1.getTryUnlockTimes(), 
processQueue2.getTryUnlockTimes());
+        assertEquals(processQueue1.getLastLockTimestamp(), 
processQueue2.getLastLockTimestamp());
+        assertEquals(processQueue1.getLastPullTimestamp(), 
processQueue2.getLastPullTimestamp());
+    }
+
+    private ProcessQueue createProcessQueue() {
+        ProcessQueue result = new ProcessQueue();
+        result.setMsgAccCnt(1);
+        result.incTryUnlockTimes();
+        return result;
     }
 
     private List<MessageExt> createMessageList() {
@@ -108,13 +174,15 @@ public class ProcessQueueTest {
     }
 
     private List<MessageExt> createMessageList(int count) {
-        List<MessageExt> messageExtList = new ArrayList<>();
+        List<MessageExt> result = new ArrayList<>();
         for (int i = 0; i < count; i++) {
             MessageExt messageExt = new MessageExt();
             messageExt.setQueueOffset(i);
             messageExt.setBody(new byte[123]);
-            messageExtList.add(messageExt);
+            messageExt.setKeys("keys" + i);
+            
messageExt.getProperties().put(MessageConst.PROPERTY_CONSUME_START_TIMESTAMP, 
System.currentTimeMillis() + "");
+            result.add(messageExt);
         }
-        return messageExtList;
+        return result;
     }
 }

Reply via email to