This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new faae64715d [ISSUE #7601] Fix slave acting master bug (#7603)
faae64715d is described below

commit faae64715d917bb5d64b8d72581172d26ebe9501
Author: gaoyf <ga...@users.noreply.github.com>
AuthorDate: Thu Dec 7 11:25:22 2023 +0800

    [ISSUE #7601] Fix slave acting master bug (#7603)
    
    * fix NullPointerException when message escape to remote
    
    * fix NumberFormatException when message retry to escape to remote
    
    * fix timerCheckPoint of the master is not updated, causing the timer 
message to be replayed after master is restarted
    
    * Use properties copies instead of referencing the same map when converting 
message
---
 .../java/org/apache/rocketmq/broker/BrokerController.java    |  1 +
 .../org/apache/rocketmq/broker/slave/SlaveSynchronize.java   |  4 +++-
 .../org/apache/rocketmq/common/message/MessageAccessor.java  |  7 +++++++
 .../org/apache/rocketmq/store/timer/TimerMessageStore.java   | 12 +++++++++---
 4 files changed, 20 insertions(+), 4 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java 
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 9f1fd0ad02..8d29d44383 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -2108,6 +2108,7 @@ public class BrokerController {
             isScheduleServiceStart = shouldStart;
 
             if (timerMessageStore != null) {
+                timerMessageStore.syncLastReadTimeMs();
                 timerMessageStore.setShouldRunningDequeue(shouldStart);
             }
         }
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java 
b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java
index 53cdecdf85..7f802adb93 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java
@@ -215,11 +215,13 @@ public class SlaveSynchronize {
         String masterAddrBak = this.masterAddr;
         if (masterAddrBak != null) {
             try {
-                if (null != 
brokerController.getMessageStore().getTimerMessageStore()) {
+                if (null != 
brokerController.getMessageStore().getTimerMessageStore() &&
+                        
!brokerController.getTimerMessageStore().isShouldRunningDequeue()) {
                     TimerCheckpoint checkpoint = 
this.brokerController.getBrokerOuterAPI().getTimerCheckPoint(masterAddrBak);
                     if (null != this.brokerController.getTimerCheckpoint()) {
                         
this.brokerController.getTimerCheckpoint().setLastReadTimeMs(checkpoint.getLastReadTimeMs());
                         
this.brokerController.getTimerCheckpoint().setMasterTimerQueueOffset(checkpoint.getMasterTimerQueueOffset());
+                        
this.brokerController.getTimerCheckpoint().getDataVersion().assignNewOne(checkpoint.getDataVersion());
                     }
                 }
             } catch (Exception e) {
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/message/MessageAccessor.java 
b/common/src/main/java/org/apache/rocketmq/common/message/MessageAccessor.java
index 1b7e2bba32..62e3bbd7e6 100644
--- 
a/common/src/main/java/org/apache/rocketmq/common/message/MessageAccessor.java
+++ 
b/common/src/main/java/org/apache/rocketmq/common/message/MessageAccessor.java
@@ -17,6 +17,7 @@
 
 package org.apache.rocketmq.common.message;
 
+import java.util.HashMap;
 import java.util.Map;
 
 public class MessageAccessor {
@@ -96,4 +97,10 @@ public class MessageAccessor {
         return newMsg;
     }
 
+    public static Map<String, String> deepCopyProperties(Map<String, String> 
properties) {
+        if (properties == null) {
+            return null;
+        }
+        return new HashMap<>(properties);
+    }
 }
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java 
b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
index d796e4467d..872cd71054 100644
--- a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
@@ -602,6 +602,10 @@ public class TimerMessageStore {
         this.shouldRunningDequeue = shouldRunningDequeue;
     }
 
+    public boolean isShouldRunningDequeue() {
+        return shouldRunningDequeue;
+    }
+
     public void addMetric(MessageExt msg, int value) {
         try {
             if (null == msg || null == 
msg.getProperty(MessageConst.PROPERTY_REAL_TOPIC)) {
@@ -1084,8 +1088,10 @@ public class TimerMessageStore {
                     case PUT_OK:
                         if (brokerStatsManager != null) {
                             
this.brokerStatsManager.incTopicPutNums(message.getTopic(), 1, 1);
-                            
this.brokerStatsManager.incTopicPutSize(message.getTopic(),
-                                
putMessageResult.getAppendMessageResult().getWroteBytes());
+                            if (putMessageResult.getAppendMessageResult() != 
null) {
+                                
this.brokerStatsManager.incTopicPutSize(message.getTopic(),
+                                        
putMessageResult.getAppendMessageResult().getWroteBytes());
+                            }
                             
this.brokerStatsManager.incBrokerPutNums(message.getTopic(), 1);
                         }
                         return PUT_OK;
@@ -1119,7 +1125,7 @@ public class TimerMessageStore {
         MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
         msgInner.setBody(msgExt.getBody());
         msgInner.setFlag(msgExt.getFlag());
-        MessageAccessor.setProperties(msgInner, msgExt.getProperties());
+        MessageAccessor.setProperties(msgInner, 
MessageAccessor.deepCopyProperties(msgExt.getProperties()));
         TopicFilterType topicFilterType = 
MessageExt.parseTopicFilterType(msgInner.getSysFlag());
         long tagsCodeValue =
             MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, 
msgInner.getTags());

Reply via email to