This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 2538c3414d [ISSUE #9106] Fix revive backoff retry not effective in Pop 
Consumption based on rocksdb (#9107)
2538c3414d is described below

commit 2538c3414d17604b930bcd52dba15edf210c4ab8
Author: Liu Shengzhong <szliu0...@gmail.com>
AuthorDate: Mon Jan 6 10:03:34 2025 +0800

    [ISSUE #9106] Fix revive backoff retry not effective in Pop Consumption 
based on rocksdb (#9107)
---
 .../rocketmq/broker/pop/PopConsumerService.java    | 11 ++++---
 .../broker/pop/PopConsumerServiceTest.java         | 36 ++++++++++++++++++++++
 2 files changed, 43 insertions(+), 4 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java 
b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java
index fb371dce05..647e3d6ff7 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java
@@ -496,10 +496,13 @@ public class PopConsumerService extends ServiceThread {
                     if (record.getAttemptTimes() < 
brokerConfig.getPopReviveMaxAttemptTimes()) {
                         long backoffInterval = 1000L * 
REWRITE_INTERVALS_IN_SECONDS[
                             Math.min(REWRITE_INTERVALS_IN_SECONDS.length, 
record.getAttemptTimes())];
-                        record.setInvisibleTime(record.getInvisibleTime() + 
backoffInterval);
-                        record.setAttemptTimes(record.getAttemptTimes() + 1);
-                        failureList.add(record);
-                        log.warn("PopConsumerService revive backoff retry, 
record={}", record);
+                        long nextInvisibleTime = record.getInvisibleTime() + 
backoffInterval;
+                        PopConsumerRecord retryRecord = new 
PopConsumerRecord(record.getPopTime(), record.getGroupId(),
+                            record.getTopicId(), record.getQueueId(), 
record.getRetryFlag(), nextInvisibleTime,
+                            record.getOffset(), record.getAttemptId());
+                        retryRecord.setAttemptTimes(record.getAttemptTimes() + 
1);
+                        failureList.add(retryRecord);
+                        log.warn("PopConsumerService revive backoff retry, 
record={}", retryRecord);
                     } else {
                         log.error("PopConsumerService drop record, message may 
be lost, record={}", record);
                     }
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java
index 5e73adb1ea..b77c170c8c 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerServiceTest.java
@@ -385,6 +385,42 @@ public class PopConsumerServiceTest {
         consumerService.shutdown();
     }
 
+    @Test
+    public void reviveBackoffRetryTest() {
+        
Mockito.when(brokerController.getEscapeBridge()).thenReturn(Mockito.mock(EscapeBridge.class));
+        PopConsumerService consumerServiceSpy = Mockito.spy(consumerService);
+
+        consumerService.getPopConsumerStore().start();
+
+        long popTime = 1000000000L;
+        long invisibleTime = 60 * 1000L;
+        PopConsumerRecord record = new PopConsumerRecord();
+        record.setPopTime(popTime);
+        record.setInvisibleTime(invisibleTime);
+        record.setTopicId("topic");
+        record.setGroupId("group");
+        record.setQueueId(0);
+        record.setOffset(0);
+        
consumerService.getPopConsumerStore().writeRecords(Collections.singletonList(record));
+
+        
Mockito.doReturn(CompletableFuture.completedFuture(Triple.of(Mockito.mock(MessageExt.class),
 "", false)))
+            
.when(consumerServiceSpy).getMessageAsync(any(PopConsumerRecord.class));
+        
Mockito.when(brokerController.getEscapeBridge().putMessageToSpecificQueue(any(MessageExtBrokerInner.class))).thenReturn(
+            new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, new 
AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))
+        );
+
+        long visibleTimestamp = popTime + invisibleTime;
+
+        // revive fails
+        Assert.assertEquals(1, consumerServiceSpy.revive(visibleTimestamp, 1));
+        // should be invisible now
+        Assert.assertEquals(0, 
consumerService.getPopConsumerStore().scanExpiredRecords(visibleTimestamp, 
1).size());
+        // will be visible again in 10 seconds
+        Assert.assertEquals(1, 
consumerService.getPopConsumerStore().scanExpiredRecords(visibleTimestamp + 10 
* 1000, 1).size());
+
+        consumerService.shutdown();
+    }
+
     @Test
     public void transferToFsStoreTest() {
         Assert.assertNotNull(consumerService.getServiceName());

Reply via email to