This is an automated email from the ASF dual-hosted git repository.

lollipop pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new b638d4cfbb [ISSUE #8460] Set default broker name when revive found ack 
without broker name field (#8981)
b638d4cfbb is described below

commit b638d4cfbbdfcbd6a8aa2feee580519f68d85730
Author: lizhimins <707364...@qq.com>
AuthorDate: Mon Nov 25 19:24:43 2024 +0800

    [ISSUE #8460] Set default broker name when revive found ack without broker 
name field (#8981)
---
 .../org/apache/rocketmq/broker/processor/PopReviveService.java   | 9 +++++++--
 1 file changed, 7 insertions(+), 2 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
index f27934efdf..e1ead86169 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
@@ -28,6 +28,7 @@ import java.util.Map;
 import java.util.NavigableMap;
 import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Triple;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
@@ -376,7 +377,9 @@ public class PopReviveService extends ServiceThread {
                     }
                     AckMsg ackMsg = JSON.parseObject(raw, AckMsg.class);
                     PopMetricsManager.incPopReviveAckGetCount(ackMsg, queueId);
-                    String mergeKey = ackMsg.getTopic() + 
ackMsg.getConsumerGroup() + ackMsg.getQueueId() + ackMsg.getStartOffset() + 
ackMsg.getPopTime() + ackMsg.getBrokerName();
+                    String brokerName = 
StringUtils.isNotBlank(ackMsg.getBrokerName()) ?
+                        ackMsg.getBrokerName() : 
brokerController.getBrokerConfig().getBrokerName();
+                    String mergeKey = ackMsg.getTopic() + 
ackMsg.getConsumerGroup() + ackMsg.getQueueId() + ackMsg.getStartOffset() + 
ackMsg.getPopTime() + brokerName;
                     PopCheckPoint point = map.get(mergeKey);
                     if (point == null) {
                         if 
(!brokerController.getBrokerConfig().isEnableSkipLongAwaitingAck()) {
@@ -401,7 +404,9 @@ public class PopReviveService extends ServiceThread {
 
                     BatchAckMsg bAckMsg = JSON.parseObject(raw, 
BatchAckMsg.class);
                     PopMetricsManager.incPopReviveAckGetCount(bAckMsg, 
queueId);
-                    String mergeKey = bAckMsg.getTopic() + 
bAckMsg.getConsumerGroup() + bAckMsg.getQueueId() + bAckMsg.getStartOffset() + 
bAckMsg.getPopTime() + bAckMsg.getBrokerName();
+                    String brokerName = 
StringUtils.isNotBlank(bAckMsg.getBrokerName()) ?
+                        bAckMsg.getBrokerName() : 
brokerController.getBrokerConfig().getBrokerName();
+                    String mergeKey = bAckMsg.getTopic() + 
bAckMsg.getConsumerGroup() + bAckMsg.getQueueId() + bAckMsg.getStartOffset() + 
bAckMsg.getPopTime() + brokerName;
                     PopCheckPoint point = map.get(mergeKey);
                     if (point == null) {
                         if 
(!brokerController.getBrokerConfig().isEnableSkipLongAwaitingAck()) {

Reply via email to