This is an automated email from the ASF dual-hosted git repository. jinrongtong pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new faae64715d [ISSUE #7601] Fix slave acting master bug (#7603) faae64715d is described below commit faae64715d917bb5d64b8d72581172d26ebe9501 Author: gaoyf <ga...@users.noreply.github.com> AuthorDate: Thu Dec 7 11:25:22 2023 +0800 [ISSUE #7601] Fix slave acting master bug (#7603) * fix NullPointerException when message escape to remote * fix NumberFormatException when message retry to escape to remote * fix timerCheckPoint of the master is not updated, causing the timer message to be replayed after master is restarted * Use properties copies instead of referencing the same map when converting message --- .../java/org/apache/rocketmq/broker/BrokerController.java | 1 + .../org/apache/rocketmq/broker/slave/SlaveSynchronize.java | 4 +++- .../org/apache/rocketmq/common/message/MessageAccessor.java | 7 +++++++ .../org/apache/rocketmq/store/timer/TimerMessageStore.java | 12 +++++++++--- 4 files changed, 20 insertions(+), 4 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index 9f1fd0ad02..8d29d44383 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -2108,6 +2108,7 @@ public class BrokerController { isScheduleServiceStart = shouldStart; if (timerMessageStore != null) { + timerMessageStore.syncLastReadTimeMs(); timerMessageStore.setShouldRunningDequeue(shouldStart); } } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java index 53cdecdf85..7f802adb93 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java @@ -215,11 +215,13 @@ public class SlaveSynchronize { String masterAddrBak = this.masterAddr; if (masterAddrBak != null) { try { - if (null != brokerController.getMessageStore().getTimerMessageStore()) { + if (null != brokerController.getMessageStore().getTimerMessageStore() && + !brokerController.getTimerMessageStore().isShouldRunningDequeue()) { TimerCheckpoint checkpoint = this.brokerController.getBrokerOuterAPI().getTimerCheckPoint(masterAddrBak); if (null != this.brokerController.getTimerCheckpoint()) { this.brokerController.getTimerCheckpoint().setLastReadTimeMs(checkpoint.getLastReadTimeMs()); this.brokerController.getTimerCheckpoint().setMasterTimerQueueOffset(checkpoint.getMasterTimerQueueOffset()); + this.brokerController.getTimerCheckpoint().getDataVersion().assignNewOne(checkpoint.getDataVersion()); } } } catch (Exception e) { diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageAccessor.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageAccessor.java index 1b7e2bba32..62e3bbd7e6 100644 --- a/common/src/main/java/org/apache/rocketmq/common/message/MessageAccessor.java +++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageAccessor.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.common.message; +import java.util.HashMap; import java.util.Map; public class MessageAccessor { @@ -96,4 +97,10 @@ public class MessageAccessor { return newMsg; } + public static Map<String, String> deepCopyProperties(Map<String, String> properties) { + if (properties == null) { + return null; + } + return new HashMap<>(properties); + } } diff --git a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java index d796e4467d..872cd71054 100644 --- a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java @@ -602,6 +602,10 @@ public class TimerMessageStore { this.shouldRunningDequeue = shouldRunningDequeue; } + public boolean isShouldRunningDequeue() { + return shouldRunningDequeue; + } + public void addMetric(MessageExt msg, int value) { try { if (null == msg || null == msg.getProperty(MessageConst.PROPERTY_REAL_TOPIC)) { @@ -1084,8 +1088,10 @@ public class TimerMessageStore { case PUT_OK: if (brokerStatsManager != null) { this.brokerStatsManager.incTopicPutNums(message.getTopic(), 1, 1); - this.brokerStatsManager.incTopicPutSize(message.getTopic(), - putMessageResult.getAppendMessageResult().getWroteBytes()); + if (putMessageResult.getAppendMessageResult() != null) { + this.brokerStatsManager.incTopicPutSize(message.getTopic(), + putMessageResult.getAppendMessageResult().getWroteBytes()); + } this.brokerStatsManager.incBrokerPutNums(message.getTopic(), 1); } return PUT_OK; @@ -1119,7 +1125,7 @@ public class TimerMessageStore { MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); msgInner.setBody(msgExt.getBody()); msgInner.setFlag(msgExt.getFlag()); - MessageAccessor.setProperties(msgInner, msgExt.getProperties()); + MessageAccessor.setProperties(msgInner, MessageAccessor.deepCopyProperties(msgExt.getProperties())); TopicFilterType topicFilterType = MessageExt.parseTopicFilterType(msgInner.getSysFlag()); long tagsCodeValue = MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags());