This is an automated email from the ASF dual-hosted git repository. lollipop pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new b638d4cfbb [ISSUE #8460] Set default broker name when revive found ack without broker name field (#8981) b638d4cfbb is described below commit b638d4cfbbdfcbd6a8aa2feee580519f68d85730 Author: lizhimins <707364...@qq.com> AuthorDate: Mon Nov 25 19:24:43 2024 +0800 [ISSUE #8460] Set default broker name when revive found ack without broker name field (#8981) --- .../org/apache/rocketmq/broker/processor/PopReviveService.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java index f27934efdf..e1ead86169 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java @@ -28,6 +28,7 @@ import java.util.Map; import java.util.NavigableMap; import java.util.TreeMap; import java.util.concurrent.CompletableFuture; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Triple; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.metrics.BrokerMetricsManager; @@ -376,7 +377,9 @@ public class PopReviveService extends ServiceThread { } AckMsg ackMsg = JSON.parseObject(raw, AckMsg.class); PopMetricsManager.incPopReviveAckGetCount(ackMsg, queueId); - String mergeKey = ackMsg.getTopic() + ackMsg.getConsumerGroup() + ackMsg.getQueueId() + ackMsg.getStartOffset() + ackMsg.getPopTime() + ackMsg.getBrokerName(); + String brokerName = StringUtils.isNotBlank(ackMsg.getBrokerName()) ? + ackMsg.getBrokerName() : brokerController.getBrokerConfig().getBrokerName(); + String mergeKey = ackMsg.getTopic() + ackMsg.getConsumerGroup() + ackMsg.getQueueId() + ackMsg.getStartOffset() + ackMsg.getPopTime() + brokerName; PopCheckPoint point = map.get(mergeKey); if (point == null) { if (!brokerController.getBrokerConfig().isEnableSkipLongAwaitingAck()) { @@ -401,7 +404,9 @@ public class PopReviveService extends ServiceThread { BatchAckMsg bAckMsg = JSON.parseObject(raw, BatchAckMsg.class); PopMetricsManager.incPopReviveAckGetCount(bAckMsg, queueId); - String mergeKey = bAckMsg.getTopic() + bAckMsg.getConsumerGroup() + bAckMsg.getQueueId() + bAckMsg.getStartOffset() + bAckMsg.getPopTime() + bAckMsg.getBrokerName(); + String brokerName = StringUtils.isNotBlank(bAckMsg.getBrokerName()) ? + bAckMsg.getBrokerName() : brokerController.getBrokerConfig().getBrokerName(); + String mergeKey = bAckMsg.getTopic() + bAckMsg.getConsumerGroup() + bAckMsg.getQueueId() + bAckMsg.getStartOffset() + bAckMsg.getPopTime() + brokerName; PopCheckPoint point = map.get(mergeKey); if (point == null) { if (!brokerController.getBrokerConfig().isEnableSkipLongAwaitingAck()) {