This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 524ca4e862 [ISSUE #8460] Improve the pop revive process when reading 
biz messages from a remote broker - part2 (#8494)
524ca4e862 is described below

commit 524ca4e862b3bb4b5f713ba09dac106beff0ce98
Author: imzs <i...@foxmail.com>
AuthorDate: Wed Aug 14 13:48:44 2024 +0800

    [ISSUE #8460] Improve the pop revive process when reading biz messages from 
a remote broker - part2 (#8494)
---
 .../rocketmq/broker/failover/EscapeBridge.java     |  39 ++++++--
 .../broker/processor/PopReviveService.java         |  22 ++---
 .../rocketmq/broker/failover/EscapeBridgeTest.java | 106 +++++++++++++++++++--
 .../broker/processor/PopReviveServiceTest.java     |  45 ++++++++-
 .../org/apache/rocketmq/common/BrokerConfig.java   |  11 +++
 5 files changed, 196 insertions(+), 27 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/failover/EscapeBridge.java 
b/broker/src/main/java/org/apache/rocketmq/broker/failover/EscapeBridge.java
index 7df49f8c47..762d917d64 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/failover/EscapeBridge.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/failover/EscapeBridge.java
@@ -48,9 +48,11 @@ import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.apache.rocketmq.store.GetMessageResult;
+import org.apache.rocketmq.store.GetMessageStatus;
 import org.apache.rocketmq.store.MessageStore;
 import org.apache.rocketmq.store.PutMessageResult;
 import org.apache.rocketmq.store.PutMessageStatus;
+import org.apache.rocketmq.tieredstore.TieredMessageStore;
 
 public class EscapeBridge {
     protected static final Logger LOG = 
LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
@@ -99,7 +101,7 @@ public class EscapeBridge {
 
             try {
                 messageExt.setWaitStoreMsgOK(false);
-                final SendResult sendResult = 
putMessageToRemoteBroker(messageExt);
+                final SendResult sendResult = 
putMessageToRemoteBroker(messageExt, null);
                 return transformSendResult2PutResult(sendResult);
             } catch (Exception e) {
                 LOG.error("sendMessageInFailover to remote failed", e);
@@ -112,7 +114,10 @@ public class EscapeBridge {
         }
     }
 
-    private SendResult putMessageToRemoteBroker(MessageExtBrokerInner 
messageExt) {
+    public SendResult putMessageToRemoteBroker(MessageExtBrokerInner 
messageExt, String brokerNameToSend) {
+        if 
(this.brokerController.getBrokerConfig().getBrokerName().equals(brokerNameToSend))
 { // not remote broker
+            return null;
+        }
         final boolean isTransHalfMessage = 
TransactionalMessageUtil.buildHalfTopic().equals(messageExt.getTopic());
         MessageExtBrokerInner messageToPut = messageExt;
         if (isTransHalfMessage) {
@@ -125,12 +130,26 @@ public class EscapeBridge {
             return null;
         }
 
-        final MessageQueue mqSelected = 
topicPublishInfo.selectOneMessageQueue(this.brokerController.getBrokerConfig().getBrokerName());
-
-        messageToPut.setQueueId(mqSelected.getQueueId());
+        final MessageQueue mqSelected;
+        if (StringUtils.isEmpty(brokerNameToSend)) {
+            mqSelected = 
topicPublishInfo.selectOneMessageQueue(this.brokerController.getBrokerConfig().getBrokerName());
+            messageToPut.setQueueId(mqSelected.getQueueId());
+            brokerNameToSend = mqSelected.getBrokerName();
+            if 
(this.brokerController.getBrokerConfig().getBrokerName().equals(brokerNameToSend))
 {
+                LOG.warn("putMessageToRemoteBroker failed, remote broker not 
found. Topic: {}, MsgId: {}, Broker: {}",
+                        messageExt.getTopic(), messageExt.getMsgId(), 
brokerNameToSend);
+                return null;
+            }
+        } else {
+            mqSelected = new MessageQueue(messageExt.getTopic(), 
brokerNameToSend, messageExt.getQueueId());
+        }
 
-        final String brokerNameToSend = mqSelected.getBrokerName();
         final String brokerAddrToSend = 
this.brokerController.getTopicRouteInfoManager().findBrokerAddressInPublish(brokerNameToSend);
+        if (null == brokerAddrToSend) {
+            LOG.warn("putMessageToRemoteBroker failed, remote broker address 
not found. Topic: {}, MsgId: {}, Broker: {}",
+                    messageExt.getTopic(), messageExt.getMsgId(), 
brokerNameToSend);
+            return null;
+        }
 
         final long beginTimestamp = System.currentTimeMillis();
         try {
@@ -279,8 +298,12 @@ public class EscapeBridge {
                     }
                     List<MessageExt> list = decodeMsgList(result, 
deCompressBody);
                     if (list == null || list.isEmpty()) {
-                        LOG.warn("Can not get msg , topic {}, offset {}, 
queueId {}, result is {}", topic, offset, queueId, result);
-                        return Triple.of(null, "Can not get msg", false); // 
local store, so no retry
+                        // OFFSET_FOUND_NULL returned by TieredMessageStore 
indicates exception occurred
+                        boolean needRetry = 
GetMessageStatus.OFFSET_FOUND_NULL.equals(result.getStatus())
+                                && messageStore instanceof TieredMessageStore;
+                        LOG.warn("Can not get msg , topic {}, offset {}, 
queueId {}, needRetry {}, result is {}",
+                                topic, offset, queueId, needRetry, result);
+                        return Triple.of(null, "Can not get msg", needRetry);
                     }
                     return Triple.of(list.get(0), "", false);
                 });
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
index 114d094600..4b141d2910 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
@@ -199,9 +199,8 @@ public class PopReviveService extends ServiceThread {
     }
 
     // Triple<MessageExt, info, needRetry>
-    private CompletableFuture<Triple<MessageExt, String, Boolean>> 
getBizMessage(String topic, long offset, int queueId,
-        String brokerName) {
-        return this.brokerController.getEscapeBridge().getMessageAsync(topic, 
offset, queueId, brokerName, false);
+    public CompletableFuture<Triple<MessageExt, String, Boolean>> 
getBizMessage(PopCheckPoint popCheckPoint, long offset) {
+        return 
this.brokerController.getEscapeBridge().getMessageAsync(popCheckPoint.getTopic(),
 offset, popCheckPoint.getQueueId(), popCheckPoint.getBrokerName(), false);
     }
 
     public PullResult getMessage(String group, String topic, int queueId, long 
offset, int nums,
@@ -358,7 +357,7 @@ public class PopReviveService extends ServiceThread {
                     if (point.getTopic() == null || point.getCId() == null) {
                         continue;
                     }
-                    map.put(point.getTopic() + point.getCId() + 
point.getQueueId() + point.getStartOffset() + point.getPopTime(), point);
+                    map.put(point.getTopic() + point.getCId() + 
point.getQueueId() + point.getStartOffset() + point.getPopTime() + 
point.getBrokerName(), point);
                     PopMetricsManager.incPopReviveCkGetCount(point, queueId);
                     point.setReviveOffset(messageExt.getQueueOffset());
                     if (firstRt == 0) {
@@ -371,7 +370,7 @@ public class PopReviveService extends ServiceThread {
                     }
                     AckMsg ackMsg = JSON.parseObject(raw, AckMsg.class);
                     PopMetricsManager.incPopReviveAckGetCount(ackMsg, queueId);
-                    String mergeKey = ackMsg.getTopic() + 
ackMsg.getConsumerGroup() + ackMsg.getQueueId() + ackMsg.getStartOffset() + 
ackMsg.getPopTime();
+                    String mergeKey = ackMsg.getTopic() + 
ackMsg.getConsumerGroup() + ackMsg.getQueueId() + ackMsg.getStartOffset() + 
ackMsg.getPopTime() + ackMsg.getBrokerName();
                     PopCheckPoint point = map.get(mergeKey);
                     if (point == null) {
                         if 
(!brokerController.getBrokerConfig().isEnableSkipLongAwaitingAck()) {
@@ -396,7 +395,7 @@ public class PopReviveService extends ServiceThread {
 
                     BatchAckMsg bAckMsg = JSON.parseObject(raw, 
BatchAckMsg.class);
                     PopMetricsManager.incPopReviveAckGetCount(bAckMsg, 
queueId);
-                    String mergeKey = bAckMsg.getTopic() + 
bAckMsg.getConsumerGroup() + bAckMsg.getQueueId() + bAckMsg.getStartOffset() + 
bAckMsg.getPopTime();
+                    String mergeKey = bAckMsg.getTopic() + 
bAckMsg.getConsumerGroup() + bAckMsg.getQueueId() + bAckMsg.getStartOffset() + 
bAckMsg.getPopTime() + bAckMsg.getBrokerName();
                     PopCheckPoint point = map.get(mergeKey);
                     if (point == null) {
                         if 
(!brokerController.getBrokerConfig().isEnableSkipLongAwaitingAck()) {
@@ -528,7 +527,7 @@ public class PopReviveService extends ServiceThread {
 
             // retry msg
             long msgOffset = popCheckPoint.ackOffsetByIndex((byte) j);
-            CompletableFuture<Pair<Long, Boolean>> future = 
getBizMessage(popCheckPoint.getTopic(), msgOffset, popCheckPoint.getQueueId(), 
popCheckPoint.getBrokerName())
+            CompletableFuture<Pair<Long, Boolean>> future = 
getBizMessage(popCheckPoint, msgOffset)
                 .thenApply(rst -> {
                     MessageExt message = rst.getLeft();
                     if (message == null) {
@@ -568,9 +567,9 @@ public class PopReviveService extends ServiceThread {
 
     private void rePutCK(PopCheckPoint oldCK, Pair<Long, Boolean> pair) {
         int rePutTimes = oldCK.parseRePutTimes();
-        if (rePutTimes >= ckRewriteIntervalsInSeconds.length) {
-            POP_LOGGER.warn("rePut CK reach max times, drop it. {}, {}, {}, 
{}-{}, {}, {}", oldCK.getTopic(), oldCK.getCId(),
-                    oldCK.getBrokerName(), oldCK.getQueueId(), 
pair.getObject1(), oldCK.getPopTime(), oldCK.getInvisibleTime());
+        if (rePutTimes >= ckRewriteIntervalsInSeconds.length && 
brokerController.getBrokerConfig().isSkipWhenCKRePutReachMaxTimes()) {
+            POP_LOGGER.warn("rePut CK reach max times, drop it. {}, {}, {}, 
{}-{}, {}, {}, {}", oldCK.getTopic(), oldCK.getCId(),
+                    oldCK.getBrokerName(), oldCK.getQueueId(), 
pair.getObject1(), oldCK.getPopTime(), oldCK.getInvisibleTime(), rePutTimes);
             return;
         }
 
@@ -588,7 +587,8 @@ public class PopReviveService extends ServiceThread {
         newCk.setRePutTimes(String.valueOf(rePutTimes + 1)); // always 
increment even if removed from reviveRequestMap
         if (oldCK.getReviveTime() <= System.currentTimeMillis()) {
             // never expect an ACK matched in the future, we just use it to 
rewrite CK and try to revive retry message next time
-            newCk.setInvisibleTime(oldCK.getInvisibleTime() + 
ckRewriteIntervalsInSeconds[rePutTimes] * 1000);
+            int intervalIndex = rePutTimes >= 
ckRewriteIntervalsInSeconds.length ? ckRewriteIntervalsInSeconds.length - 1 : 
rePutTimes;
+            newCk.setInvisibleTime(oldCK.getInvisibleTime() + 
ckRewriteIntervalsInSeconds[intervalIndex] * 1000);
         }
         MessageExtBrokerInner ckMsg = 
brokerController.getPopMessageProcessor().buildCkMsg(newCk, queueId);
         brokerController.getMessageStore().putMessage(ckMsg);
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/failover/EscapeBridgeTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/failover/EscapeBridgeTest.java
index 7ea06665c3..27fc37dbec 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/failover/EscapeBridgeTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/failover/EscapeBridgeTest.java
@@ -30,19 +30,23 @@ import org.apache.rocketmq.broker.out.BrokerOuterAPI;
 import org.apache.rocketmq.broker.topic.TopicRouteInfoManager;
 import org.apache.rocketmq.client.consumer.PullResult;
 import org.apache.rocketmq.client.consumer.PullStatus;
+import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.message.MessageDecoder;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageExtBrokerInner;
+import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.apache.rocketmq.store.DefaultMessageStore;
 import org.apache.rocketmq.store.GetMessageResult;
+import org.apache.rocketmq.store.GetMessageStatus;
 import org.apache.rocketmq.store.PutMessageResult;
 import org.apache.rocketmq.store.PutMessageStatus;
 import org.apache.rocketmq.store.SelectMappedBufferResult;
 import org.apache.rocketmq.store.logfile.DefaultMappedFile;
 import org.apache.rocketmq.store.logfile.MappedFile;
+import org.apache.rocketmq.tieredstore.TieredMessageStore;
 import org.assertj.core.api.Assertions;
 import org.junit.After;
 import org.junit.Assert;
@@ -58,6 +62,9 @@ import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.verify;
 
 @RunWith(MockitoJUnitRunner.class)
 public class EscapeBridgeTest {
@@ -75,6 +82,9 @@ public class EscapeBridgeTest {
     @Mock
     private DefaultMessageStore defaultMessageStore;
 
+    @Mock
+    private TieredMessageStore tieredMessageStore;
+
     private GetMessageResult getMessageResult;
 
     @Mock
@@ -200,14 +210,37 @@ public class EscapeBridgeTest {
     }
 
     @Test
-    public void getMessageAsyncTest_localStore_decodeNothing() throws 
Exception {
+    public void 
getMessageAsyncTest_localStore_decodeNothing_DefaultMessageStore() throws 
Exception {
         
when(brokerController.getMessageStoreByBrokerName(any())).thenReturn(defaultMessageStore);
-        when(defaultMessageStore.getMessageAsync(anyString(), anyString(), 
anyInt(), anyLong(), anyInt(), any()))
-                
.thenReturn(CompletableFuture.completedFuture(mockGetMessageResult(0, 
TEST_TOPIC, null)));
-        Triple<MessageExt, String, Boolean> rst = 
escapeBridge.getMessageAsync(TEST_TOPIC, 0, DEFAULT_QUEUE_ID, BROKER_NAME, 
false).join();
-        Assert.assertNull(rst.getLeft());
-        Assert.assertEquals("Can not get msg", rst.getMiddle());
-        Assert.assertFalse(rst.getRight()); // no retry
+        for (GetMessageStatus status : GetMessageStatus.values()) {
+            GetMessageResult getMessageResult = mockGetMessageResult(0, 
TEST_TOPIC, null);
+            getMessageResult.setStatus(status);
+            when(defaultMessageStore.getMessageAsync(anyString(), anyString(), 
anyInt(), anyLong(), anyInt(), any()))
+                    
.thenReturn(CompletableFuture.completedFuture(getMessageResult));
+            Triple<MessageExt, String, Boolean> rst = 
escapeBridge.getMessageAsync(TEST_TOPIC, 0, DEFAULT_QUEUE_ID, BROKER_NAME, 
false).join();
+            Assert.assertNull(rst.getLeft());
+            Assert.assertEquals("Can not get msg", rst.getMiddle());
+            Assert.assertFalse(rst.getRight()); // DefaultMessageStore, no 
retry
+        }
+    }
+
+    @Test
+    public void 
getMessageAsyncTest_localStore_decodeNothing_TieredMessageStore() throws 
Exception {
+        
when(brokerController.getMessageStoreByBrokerName(any())).thenReturn(tieredMessageStore);
+        for (GetMessageStatus status : GetMessageStatus.values()) {
+            GetMessageResult getMessageResult = new GetMessageResult();
+            getMessageResult.setStatus(status);
+            when(tieredMessageStore.getMessageAsync(anyString(), anyString(), 
anyInt(), anyLong(), anyInt(), any()))
+                    
.thenReturn(CompletableFuture.completedFuture(getMessageResult));
+            Triple<MessageExt, String, Boolean> rst = 
escapeBridge.getMessageAsync(TEST_TOPIC, 0, DEFAULT_QUEUE_ID, BROKER_NAME, 
false).join();
+            Assert.assertNull(rst.getLeft());
+            Assert.assertEquals("Can not get msg", rst.getMiddle());
+            if (GetMessageStatus.OFFSET_FOUND_NULL.equals(status)) {
+                Assert.assertTrue(rst.getRight()); // TieredMessageStore 
returns OFFSET_FOUND_NULL, need retry
+            } else {
+                Assert.assertFalse(rst.getRight()); // other status, like 
DefaultMessageStore, no retry
+            }
+        }
     }
 
     @Test
@@ -320,6 +353,57 @@ public class EscapeBridgeTest {
         Assert.assertTrue(Arrays.equals(msg.getBody(), list.get(0).getBody()));
     }
 
+    @Test
+    public void 
testPutMessageToRemoteBroker_noSpecificBrokerName_hasRemoteBroker() throws 
Exception {
+        MessageExtBrokerInner message = new MessageExtBrokerInner();
+        message.setTopic(TEST_TOPIC);
+        String anotherBrokerName = "broker_b";
+        TopicPublishInfo publishInfo = mockTopicPublishInfo(BROKER_NAME, 
anotherBrokerName);
+        
when(topicRouteInfoManager.tryToFindTopicPublishInfo(anyString())).thenReturn(publishInfo);
+        
when(topicRouteInfoManager.findBrokerAddressInPublish(anotherBrokerName)).thenReturn("127.0.0.1");
+        escapeBridge.putMessageToRemoteBroker(message, null);
+        verify(brokerOuterAPI).sendMessageToSpecificBroker(eq("127.0.0.1"), 
eq(anotherBrokerName), any(MessageExtBrokerInner.class), anyString(), 
anyLong());
+    }
+
+    @Test
+    public void 
testPutMessageToRemoteBroker_noSpecificBrokerName_noRemoteBroker() throws 
Exception {
+        MessageExtBrokerInner message = new MessageExtBrokerInner();
+        message.setTopic(TEST_TOPIC);
+        TopicPublishInfo publishInfo = mockTopicPublishInfo(BROKER_NAME);
+        
when(topicRouteInfoManager.tryToFindTopicPublishInfo(anyString())).thenReturn(publishInfo);
+        escapeBridge.putMessageToRemoteBroker(message, null);
+        verify(topicRouteInfoManager, 
times(0)).findBrokerAddressInPublish(anyString());
+    }
+
+    @Test
+    public void testPutMessageToRemoteBroker_specificBrokerName_equals() 
throws Exception {
+        escapeBridge.putMessageToRemoteBroker(new MessageExtBrokerInner(), 
BROKER_NAME);
+        verify(topicRouteInfoManager, 
times(0)).tryToFindTopicPublishInfo(anyString());
+    }
+
+    @Test
+    public void 
testPutMessageToRemoteBroker_specificBrokerName_addressNotFound() throws 
Exception {
+        MessageExtBrokerInner message = new MessageExtBrokerInner();
+        message.setTopic(TEST_TOPIC);
+        TopicPublishInfo publishInfo = mockTopicPublishInfo(BROKER_NAME);
+        
when(topicRouteInfoManager.tryToFindTopicPublishInfo(anyString())).thenReturn(publishInfo);
+        escapeBridge.putMessageToRemoteBroker(message, "whatever");
+        
verify(topicRouteInfoManager).findBrokerAddressInPublish(eq("whatever"));
+        verify(brokerOuterAPI, 
times(0)).sendMessageToSpecificBroker(anyString(), anyString(), 
any(MessageExtBrokerInner.class), anyString(), anyLong());
+    }
+
+    @Test
+    public void testPutMessageToRemoteBroker_specificBrokerName_addressFound() 
throws Exception {
+        MessageExtBrokerInner message = new MessageExtBrokerInner();
+        message.setTopic(TEST_TOPIC);
+        String anotherBrokerName = "broker_b";
+        TopicPublishInfo publishInfo = mockTopicPublishInfo(BROKER_NAME, 
anotherBrokerName);
+        
when(topicRouteInfoManager.tryToFindTopicPublishInfo(anyString())).thenReturn(publishInfo);
+        
when(topicRouteInfoManager.findBrokerAddressInPublish(anotherBrokerName)).thenReturn("127.0.0.1");
+        escapeBridge.putMessageToRemoteBroker(message, anotherBrokerName);
+        verify(brokerOuterAPI).sendMessageToSpecificBroker(eq("127.0.0.1"), 
eq(anotherBrokerName), any(MessageExtBrokerInner.class), anyString(), 
anyLong());
+    }
+
     private GetMessageResult mockGetMessageResult(int count, String topic, 
byte[] body) throws Exception {
         GetMessageResult result = new GetMessageResult();
         for (int i = 0; i < count; i++) {
@@ -337,4 +421,12 @@ public class EscapeBridgeTest {
         return result;
     }
 
+    private TopicPublishInfo mockTopicPublishInfo(String... brokerNames) {
+        TopicPublishInfo topicPublishInfo = new TopicPublishInfo();
+        for (String brokerName : brokerNames) {
+            topicPublishInfo.getMessageQueueList().add(new 
MessageQueue(TEST_TOPIC, brokerName, 0));
+        }
+        return topicPublishInfo;
+    }
+
 }
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopReviveServiceTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopReviveServiceTest.java
index d7ea97c550..3010e83610 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopReviveServiceTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopReviveServiceTest.java
@@ -104,7 +104,6 @@ public class PopReviveServiceTest {
         brokerConfig = new BrokerConfig();
         brokerConfig.setBrokerClusterName(CLUSTER_NAME);
         when(brokerController.getBrokerConfig()).thenReturn(brokerConfig);
-        when(brokerController.getBrokerConfig()).thenReturn(brokerConfig);
         
when(brokerController.getConsumerOffsetManager()).thenReturn(consumerOffsetManager);
         when(brokerController.getMessageStore()).thenReturn(messageStore);
         
when(brokerController.getTopicConfigManager()).thenReturn(topicConfigManager);
@@ -285,6 +284,7 @@ public class PopReviveServiceTest {
 
     @Test
     public void 
testReviveMsgFromCk_messageFound_writeRetryFailed_rewriteCK_end() throws 
Throwable {
+        brokerConfig.setSkipWhenCKRePutReachMaxTimes(true);
         PopCheckPoint ck = buildPopCheckPoint(0, 0, 0);
         ck.setRePutTimes("17");
         PopReviveService.ConsumeReviveObj reviveObj = new 
PopReviveService.ConsumeReviveObj();
@@ -306,6 +306,30 @@ public class PopReviveServiceTest {
         verify(messageStore, 
times(0)).putMessage(any(MessageExtBrokerInner.class)); // rewrite CK
     }
 
+    @Test
+    public void 
testReviveMsgFromCk_messageFound_writeRetryFailed_rewriteCK_noEnd() throws 
Throwable {
+        brokerConfig.setSkipWhenCKRePutReachMaxTimes(false);
+        PopCheckPoint ck = buildPopCheckPoint(0, 0, 0);
+        ck.setRePutTimes(Byte.MAX_VALUE + "");
+        PopReviveService.ConsumeReviveObj reviveObj = new 
PopReviveService.ConsumeReviveObj();
+        reviveObj.map.put("", ck);
+        reviveObj.endTime = System.currentTimeMillis();
+        StringBuilder actualRetryTopic = new StringBuilder();
+
+        when(escapeBridge.getMessageAsync(anyString(), anyLong(), anyInt(), 
anyString(), anyBoolean()))
+                .thenReturn(CompletableFuture.completedFuture(Triple.of(new 
MessageExt(), "", false)));
+        
when(escapeBridge.putMessageToSpecificQueue(any(MessageExtBrokerInner.class))).thenAnswer(invocation
 -> {
+            MessageExtBrokerInner msg = invocation.getArgument(0);
+            actualRetryTopic.append(msg.getTopic());
+            return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new 
AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED));
+        });
+
+        popReviveService.mergeAndRevive(reviveObj);
+        Assert.assertEquals(KeyBuilder.buildPopRetryTopic(TOPIC, GROUP, 
false), actualRetryTopic.toString());
+        verify(escapeBridge, 
times(1)).putMessageToSpecificQueue(any(MessageExtBrokerInner.class)); // write 
retry
+        verify(messageStore, 
times(1)).putMessage(any(MessageExtBrokerInner.class)); // rewrite CK
+    }
+
     @Test
     public void testReviveMsgFromCk_messageNotFound_noRetry() throws Throwable 
{
         PopCheckPoint ck = buildPopCheckPoint(0, 0, 0);
@@ -349,6 +373,7 @@ public class PopReviveServiceTest {
 
     @Test
     public void testReviveMsgFromCk_messageNotFound_needRetry_end() throws 
Throwable {
+        brokerConfig.setSkipWhenCKRePutReachMaxTimes(true);
         PopCheckPoint ck = buildPopCheckPoint(0, 0, 0);
         ck.setRePutTimes("17");
         PopReviveService.ConsumeReviveObj reviveObj = new 
PopReviveService.ConsumeReviveObj();
@@ -363,6 +388,23 @@ public class PopReviveServiceTest {
         verify(messageStore, 
times(0)).putMessage(any(MessageExtBrokerInner.class)); // rewrite CK
     }
 
+    @Test
+    public void testReviveMsgFromCk_messageNotFound_needRetry_noEnd() throws 
Throwable {
+        brokerConfig.setSkipWhenCKRePutReachMaxTimes(false);
+        PopCheckPoint ck = buildPopCheckPoint(0, 0, 0);
+        ck.setRePutTimes(Byte.MAX_VALUE + "");
+        PopReviveService.ConsumeReviveObj reviveObj = new 
PopReviveService.ConsumeReviveObj();
+        reviveObj.map.put("", ck);
+        reviveObj.endTime = System.currentTimeMillis();
+
+        when(escapeBridge.getMessageAsync(anyString(), anyLong(), anyInt(), 
anyString(), anyBoolean()))
+                .thenReturn(CompletableFuture.completedFuture(Triple.of(null, 
"", true)));
+
+        popReviveService.mergeAndRevive(reviveObj);
+        verify(escapeBridge, 
times(0)).putMessageToSpecificQueue(any(MessageExtBrokerInner.class)); // write 
retry
+        verify(messageStore, 
times(1)).putMessage(any(MessageExtBrokerInner.class)); // rewrite CK
+    }
+
     public static PopCheckPoint buildPopCheckPoint(long startOffset, long 
popTime, long reviveOffset) {
         PopCheckPoint ck = new PopCheckPoint();
         ck.setStartOffset(startOffset);
@@ -386,6 +428,7 @@ public class PopReviveServiceTest {
         ackMsg.setTopic(TOPIC);
         ackMsg.setQueueId(0);
         ackMsg.setPopTime(popTime);
+        ackMsg.setBrokerName("broker-a");
 
         return ackMsg;
     }
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java 
b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index 8982e59d03..10bf7f76e8 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -419,6 +419,9 @@ public class BrokerConfig extends BrokerIdentity {
      */
     private String configBlackList = "configBlackList;brokerConfigPath";
 
+    // if false, will still rewrite ck after max times 17
+    private boolean skipWhenCKRePutReachMaxTimes = false;
+
     public String getConfigBlackList() {
         return configBlackList;
     }
@@ -1826,4 +1829,12 @@ public class BrokerConfig extends BrokerIdentity {
     public void setEnablePopMessageThreshold(boolean 
enablePopMessageThreshold) {
         this.enablePopMessageThreshold = enablePopMessageThreshold;
     }
+
+    public boolean isSkipWhenCKRePutReachMaxTimes() {
+        return skipWhenCKRePutReachMaxTimes;
+    }
+
+    public void setSkipWhenCKRePutReachMaxTimes(boolean 
skipWhenCKRePutReachMaxTimes) {
+        this.skipWhenCKRePutReachMaxTimes = skipWhenCKRePutReachMaxTimes;
+    }
 }

Reply via email to