This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new d1fd7af3f1 [ISSUE #8979] Add configurable switch for timer message 
retry logic  (#8980)
d1fd7af3f1 is described below

commit d1fd7af3f12e1bfa30c7baa7c3f687168a9f5dbf
Author: 小陈 <jamiechen....@gmail.com>
AuthorDate: Thu Dec 5 17:49:03 2024 +0800

    [ISSUE #8979] Add configurable switch for timer message retry logic  (#8980)
---
 .../rocketmq/store/config/MessageStoreConfig.java  |   9 ++
 .../rocketmq/store/timer/TimerMessageStore.java    | 121 ++++++++++++---------
 .../store/timer/TimerMessageStoreTest.java         |  87 ++++++++++++---
 3 files changed, 149 insertions(+), 68 deletions(-)

diff --git 
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java 
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index 6dfdc0b1c8..0ea5841548 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -99,6 +99,7 @@ public class MessageStoreConfig {
     private boolean timerSkipUnknownError = false;
     private boolean timerWarmEnable = false;
     private boolean timerStopDequeue = false;
+    private boolean timerEnableRetryUntilSuccess = false;
     private int timerCongestNumEachSlot = Integer.MAX_VALUE;
 
     private int timerMetricSmallThreshold = 1000000;
@@ -1689,6 +1690,14 @@ public class MessageStoreConfig {
         this.timerSkipUnknownError = timerSkipUnknownError;
     }
 
+    public boolean isTimerEnableRetryUntilSuccess() {
+        return timerEnableRetryUntilSuccess;
+    }
+
+    public void setTimerEnableRetryUntilSuccess(boolean 
timerEnableRetryUntilSuccess) {
+        this.timerEnableRetryUntilSuccess = timerEnableRetryUntilSuccess;
+    }
+
     public boolean isTimerWarmEnable() {
         return timerWarmEnable;
     }
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java 
b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
index fb166678e6..2b14618eed 100644
--- a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
@@ -1097,46 +1097,44 @@ public class TimerMessageStore {
             putMessageResult = messageStore.putMessage(message);
         }
 
-        int retryNum = 0;
-        while (retryNum < 3) {
-            if (null == putMessageResult || null == 
putMessageResult.getPutMessageStatus()) {
-                retryNum++;
-            } else {
-                switch (putMessageResult.getPutMessageStatus()) {
-                    case PUT_OK:
-                        if (brokerStatsManager != null) {
-                            
this.brokerStatsManager.incTopicPutNums(message.getTopic(), 1, 1);
-                            if (putMessageResult.getAppendMessageResult() != 
null) {
-                                
this.brokerStatsManager.incTopicPutSize(message.getTopic(),
-                                        
putMessageResult.getAppendMessageResult().getWroteBytes());
-                            }
-                            
this.brokerStatsManager.incBrokerPutNums(message.getTopic(), 1);
+        if (putMessageResult != null && putMessageResult.getPutMessageStatus() 
!= null) {
+            switch (putMessageResult.getPutMessageStatus()) {
+                case PUT_OK:
+                    if (brokerStatsManager != null) {
+                        brokerStatsManager.incTopicPutNums(message.getTopic(), 
1, 1);
+                        if (putMessageResult.getAppendMessageResult() != null) 
{
+                            
brokerStatsManager.incTopicPutSize(message.getTopic(), 
putMessageResult.getAppendMessageResult().getWroteBytes());
                         }
-                        return PUT_OK;
-                    case SERVICE_NOT_AVAILABLE:
-                        return PUT_NEED_RETRY;
-                    case MESSAGE_ILLEGAL:
-                    case PROPERTIES_SIZE_EXCEEDED:
+                        
brokerStatsManager.incBrokerPutNums(message.getTopic(), 1);
+                    }
+                    return PUT_OK;
+
+                case MESSAGE_ILLEGAL:
+                case PROPERTIES_SIZE_EXCEEDED:
+                case WHEEL_TIMER_NOT_ENABLE:
+                case WHEEL_TIMER_MSG_ILLEGAL:
+                    return PUT_NO_RETRY;
+
+                case SERVICE_NOT_AVAILABLE:
+                case FLUSH_DISK_TIMEOUT:
+                case FLUSH_SLAVE_TIMEOUT:
+                case OS_PAGE_CACHE_BUSY:
+                case CREATE_MAPPED_FILE_FAILED:
+                case SLAVE_NOT_AVAILABLE:
+                    return PUT_NEED_RETRY;
+
+                case UNKNOWN_ERROR:
+                default:
+                    if (storeConfig.isTimerSkipUnknownError()) {
+                        LOGGER.warn("Skipping message due to unknown error, 
msg: {}", message);
                         return PUT_NO_RETRY;
-                    case CREATE_MAPPED_FILE_FAILED:
-                    case FLUSH_DISK_TIMEOUT:
-                    case FLUSH_SLAVE_TIMEOUT:
-                    case OS_PAGE_CACHE_BUSY:
-                    case SLAVE_NOT_AVAILABLE:
-                    case UNKNOWN_ERROR:
-                    default:
-                        retryNum++;
-                }
-            }
-            Thread.sleep(50);
-            if (escapeBridgeHook != null) {
-                putMessageResult = escapeBridgeHook.apply(message);
-            } else {
-                putMessageResult = messageStore.putMessage(message);
+                    } else {
+                        holdMomentForUnknownError();
+                        return PUT_NEED_RETRY;
+                    }
             }
-            LOGGER.warn("Retrying to do put timer msg retryNum:{} putRes:{} 
msg:{}", retryNum, putMessageResult, message);
         }
-        return PUT_NO_RETRY;
+        return PUT_NEED_RETRY;
     }
 
     public MessageExtBrokerInner convertMessage(MessageExt msgExt, boolean 
needRoll) {
@@ -1471,7 +1469,6 @@ public class TimerMessageStore {
     }
 
     public class TimerDequeuePutMessageService extends AbstractStateService {
-
         @Override
         public String getServiceName() {
             return getServiceThreadName() + this.getClass().getSimpleName();
@@ -1481,6 +1478,7 @@ public class TimerMessageStore {
         public void run() {
             setState(AbstractStateService.START);
             TimerMessageStore.LOGGER.info(this.getServiceName() + " service 
start");
+
             while (!this.isStopped() || dequeuePutQueue.size() != 0) {
                 try {
                     setState(AbstractStateService.WAITING);
@@ -1488,41 +1486,63 @@ public class TimerMessageStore {
                     if (null == tr) {
                         continue;
                     }
+
                     setState(AbstractStateService.RUNNING);
-                    boolean doRes = false;
                     boolean tmpDequeueChangeFlag = false;
+
                     try {
-                        while (!isStopped() && !doRes) {
+                        while (!isStopped()) {
                             if (!isRunningDequeue()) {
                                 dequeueStatusChangeFlag = true;
                                 tmpDequeueChangeFlag = true;
                                 break;
                             }
+
                             try {
                                 perfCounterTicks.startTick(DEQUEUE_PUT);
+
                                 MessageExt msgExt = tr.getMsg();
                                 
DefaultStoreMetricsManager.incTimerDequeueCount(getRealTopic(msgExt));
+
                                 if (tr.getEnqueueTime() == Long.MAX_VALUE) {
-                                    // never enqueue, mark it.
+                                    // Never enqueue, mark it.
                                     MessageAccessor.putProperty(msgExt, 
TIMER_ENQUEUE_MS, String.valueOf(Long.MAX_VALUE));
                                 }
+
                                 addMetric(msgExt, -1);
                                 MessageExtBrokerInner msg = convert(msgExt, 
tr.getEnqueueTime(), needRoll(tr.getMagic()));
-                                doRes = PUT_NEED_RETRY != doPut(msg, 
needRoll(tr.getMagic()));
-                                while (!doRes && !isStopped()) {
-                                    if (!isRunningDequeue()) {
-                                        dequeueStatusChangeFlag = true;
-                                        tmpDequeueChangeFlag = true;
-                                        break;
+
+                                boolean processed = false;
+                                int retryCount = 0;
+
+                                while (!processed && !isStopped()) {
+                                    int result = doPut(msg, 
needRoll(tr.getMagic()));
+
+                                    if (result == PUT_OK) {
+                                        processed = true;
+                                    } else if (result == PUT_NO_RETRY) {
+                                        
TimerMessageStore.LOGGER.warn("Skipping message due to unrecoverable error. 
Msg: {}", msg);
+                                        processed = true;
+                                    } else {
+                                        retryCount++;
+                                        // Without enabling 
TimerEnableRetryUntilSuccess, messages will retry up to 3 times before being 
discarded
+                                        if 
(!storeConfig.isTimerEnableRetryUntilSuccess() && retryCount >= 3) {
+                                            
TimerMessageStore.LOGGER.error("Message processing failed after {} retries. 
Msg: {}", retryCount, msg);
+                                            processed = true;
+                                        } else {
+                                            Thread.sleep(500L * precisionMs / 
1000);
+                                            
TimerMessageStore.LOGGER.warn("Retrying to process message. Retry count: {}, 
Msg: {}", retryCount, msg);
+                                        }
                                     }
-                                    doRes = PUT_NEED_RETRY != doPut(msg, 
needRoll(tr.getMagic()));
-                                    Thread.sleep(500L * precisionMs / 1000);
                                 }
+
                                 perfCounterTicks.endTick(DEQUEUE_PUT);
+                                break;
+
                             } catch (Throwable t) {
-                                LOGGER.info("Unknown error", t);
+                                TimerMessageStore.LOGGER.info("Unknown error", 
t);
                                 if (storeConfig.isTimerSkipUnknownError()) {
-                                    doRes = true;
+                                    break;
                                 } else {
                                     holdMomentForUnknownError();
                                 }
@@ -1531,7 +1551,6 @@ public class TimerMessageStore {
                     } finally {
                         tr.idempotentRelease(!tmpDequeueChangeFlag);
                     }
-
                 } catch (Throwable e) {
                     TimerMessageStore.LOGGER.error("Error occurred in " + 
getServiceName(), e);
                 }
diff --git 
a/store/src/test/java/org/apache/rocketmq/store/timer/TimerMessageStoreTest.java
 
b/store/src/test/java/org/apache/rocketmq/store/timer/TimerMessageStoreTest.java
index 4ce3985f6c..52e58efde2 100644
--- 
a/store/src/test/java/org/apache/rocketmq/store/timer/TimerMessageStoreTest.java
+++ 
b/store/src/test/java/org/apache/rocketmq/store/timer/TimerMessageStoreTest.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.store.timer;
 
 import java.io.File;
 import java.io.IOException;
+import java.lang.reflect.Field;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
@@ -30,6 +31,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.rocketmq.common.BrokerConfig;
@@ -40,23 +42,26 @@ import 
org.apache.rocketmq.common.message.MessageClientIDSetter;
 import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageDecoder;
 import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.store.ConsumeQueue;
-import org.apache.rocketmq.store.DefaultMessageStore;
-import org.apache.rocketmq.store.GetMessageResult;
-import org.apache.rocketmq.store.GetMessageStatus;
-import org.apache.rocketmq.store.MessageArrivingListener;
 import org.apache.rocketmq.common.message.MessageExtBrokerInner;
 import org.apache.rocketmq.store.MessageStore;
 import org.apache.rocketmq.store.PutMessageResult;
+import org.apache.rocketmq.store.DefaultMessageStore;
 import org.apache.rocketmq.store.PutMessageStatus;
 import org.apache.rocketmq.store.config.FlushDiskType;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
 import org.apache.rocketmq.store.stats.BrokerStatsManager;
+import org.apache.rocketmq.store.ConsumeQueue;
+import org.apache.rocketmq.store.AppendMessageStatus;
+import org.apache.rocketmq.store.AppendMessageResult;
+import org.apache.rocketmq.store.GetMessageResult;
+import org.apache.rocketmq.store.GetMessageStatus;
+import org.apache.rocketmq.store.MessageArrivingListener;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Assume;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.awaitility.Awaitility.await;
@@ -65,10 +70,16 @@ import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.times;
 
 public class TimerMessageStoreTest {
     private final byte[] msgBody = new byte[1024];
     private static MessageStore messageStore;
+    private MessageStore mockMessageStore;
     private SocketAddress bornHost;
     private SocketAddress storeHost;
 
@@ -100,21 +111,23 @@ public class TimerMessageStoreTest {
         storeConfig.setTimerInterceptDelayLevel(true);
         storeConfig.setTimerPrecisionMs(precisionMs);
 
+        mockMessageStore = Mockito.mock(MessageStore.class);
         messageStore = new DefaultMessageStore(storeConfig, new 
BrokerStatsManager("TimerTest",false), new MyMessageArrivingListener(), new 
BrokerConfig(), new ConcurrentHashMap<>());
         boolean load = messageStore.load();
         assertTrue(load);
         messageStore.start();
     }
 
-    public TimerMessageStore createTimerMessageStore(String rootDir) throws 
IOException {
+    public TimerMessageStore createTimerMessageStore(String rootDir , boolean 
needMock) throws IOException {
         if (null == rootDir) {
             rootDir = StoreTestUtils.createBaseDir();
         }
 
         TimerCheckpoint timerCheckpoint = new TimerCheckpoint(rootDir + 
File.separator + "config" + File.separator + "timercheck");
         TimerMetrics timerMetrics = new TimerMetrics(rootDir + File.separator 
+ "config" + File.separator + "timermetrics");
-        TimerMessageStore timerMessageStore = new 
TimerMessageStore(messageStore, storeConfig, timerCheckpoint, timerMetrics, 
null);
-        messageStore.setTimerMessageStore(timerMessageStore);
+        MessageStore ms = needMock ? mockMessageStore : messageStore;
+        TimerMessageStore timerMessageStore = new TimerMessageStore(ms, 
storeConfig, timerCheckpoint, timerMetrics, null);
+        ms.setTimerMessageStore(timerMessageStore);
 
         baseDirs.add(rootDir);
         timerStores.add(timerMessageStore);
@@ -170,7 +183,7 @@ public class TimerMessageStoreTest {
         Assume.assumeFalse(MixAll.isWindows());
         String topic = "TimerTest_testPutTimerMessage";
 
-        final TimerMessageStore timerMessageStore = 
createTimerMessageStore(null);
+        final TimerMessageStore timerMessageStore = 
createTimerMessageStore(null , false);
         timerMessageStore.load();
         timerMessageStore.start(true);
 
@@ -212,12 +225,52 @@ public class TimerMessageStoreTest {
         }
     }
 
+    @Test
+    public void testRetryUntilSuccess() throws Exception {
+        storeConfig.setTimerEnableRetryUntilSuccess(true);
+        TimerMessageStore timerMessageStore = createTimerMessageStore(null , 
true);
+        timerMessageStore.load();
+        timerMessageStore.setShouldRunningDequeue(true);
+        Field stateField = TimerMessageStore.class.getDeclaredField("state");
+        stateField.setAccessible(true);
+        stateField.set(timerMessageStore, TimerMessageStore.RUNNING);
+
+        MessageExtBrokerInner msg = buildMessage(3000L, "TestRetry", true);
+        transformTimerMessage(timerMessageStore, msg);
+        TimerRequest timerRequest = new TimerRequest(100, 200, 3000, 
System.currentTimeMillis(), 0, msg);
+        boolean offered = 
timerMessageStore.dequeuePutQueue.offer(timerRequest);
+        assertTrue(offered);
+        assertFalse(timerMessageStore.dequeuePutQueue.isEmpty());
+
+        // If enableRetryUntilSuccess is set and putMessage return NEED_RETRY 
type, the message should be retried until success.
+        when(mockMessageStore.putMessage(any(MessageExtBrokerInner.class)))
+                .thenReturn(new 
PutMessageResult(PutMessageStatus.FLUSH_DISK_TIMEOUT, null))
+                .thenReturn(new 
PutMessageResult(PutMessageStatus.FLUSH_SLAVE_TIMEOUT, null))
+                .thenReturn(new 
PutMessageResult(PutMessageStatus.OS_PAGE_CACHE_BUSY, null))
+                .thenReturn(new 
PutMessageResult(PutMessageStatus.OS_PAGE_CACHE_BUSY, null))
+                .thenReturn(new 
PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, null))
+                .thenReturn(new PutMessageResult(PutMessageStatus.PUT_OK, new 
AppendMessageResult(AppendMessageStatus.PUT_OK)));
+
+        final CountDownLatch latch = new CountDownLatch(1);
+        new Thread(() -> {
+            try {
+                timerMessageStore.getDequeuePutMessageServices()[0].run();
+            } finally {
+                latch.countDown();
+            }
+        }).start();
+        latch.await(10, TimeUnit.SECONDS);
+
+        assertTrue(timerMessageStore.dequeuePutQueue.isEmpty());
+        verify(mockMessageStore, 
times(6)).putMessage(any(MessageExtBrokerInner.class));
+    }
+
     @Test
     public void testTimerFlowControl() throws Exception {
         String topic = "TimerTest_testTimerFlowControl";
 
         storeConfig.setTimerCongestNumEachSlot(100);
-        TimerMessageStore timerMessageStore = createTimerMessageStore(null);
+        TimerMessageStore timerMessageStore = createTimerMessageStore(null , 
false);
         timerMessageStore.load();
         timerMessageStore.start(true);
 
@@ -264,7 +317,7 @@ public class TimerMessageStoreTest {
 
         String topic = "TimerTest_testPutExpiredTimerMessage";
 
-        TimerMessageStore timerMessageStore = createTimerMessageStore(null);
+        TimerMessageStore timerMessageStore = createTimerMessageStore(null 
,false);
         timerMessageStore.load();
         timerMessageStore.start(true);
 
@@ -288,7 +341,7 @@ public class TimerMessageStoreTest {
     public void testDeleteTimerMessage() throws Exception {
         String topic = "TimerTest_testDeleteTimerMessage";
 
-        TimerMessageStore timerMessageStore = createTimerMessageStore(null);
+        TimerMessageStore timerMessageStore = createTimerMessageStore(null 
,false);
         timerMessageStore.load();
         timerMessageStore.start(true);
 
@@ -325,7 +378,7 @@ public class TimerMessageStoreTest {
     public void testPutDeleteTimerMessage() throws Exception {
         String topic = "TimerTest_testPutDeleteTimerMessage";
 
-        final TimerMessageStore timerMessageStore = 
createTimerMessageStore(null);
+        final TimerMessageStore timerMessageStore = 
createTimerMessageStore(null , false);
         timerMessageStore.load();
         timerMessageStore.start(true);
 
@@ -372,7 +425,7 @@ public class TimerMessageStoreTest {
         final String topic = "TimerTest_testStateAndRecover";
 
         String base = StoreTestUtils.createBaseDir();
-        final TimerMessageStore first = createTimerMessageStore(base);
+        final TimerMessageStore first = createTimerMessageStore(base , false);
         first.load();
         first.start(true);
 
@@ -417,7 +470,7 @@ public class TimerMessageStoreTest {
         first.getTimerWheel().flush();
         first.shutdown();
 
-        final TimerMessageStore second = createTimerMessageStore(base);
+        final TimerMessageStore second = createTimerMessageStore(base , false);
         second.debug = true;
         assertTrue(second.load());
         assertEquals(msgNum, second.getQueueOffset());
@@ -446,7 +499,7 @@ public class TimerMessageStoreTest {
     public void testMaxDelaySec() throws Exception {
         String topic = "TimerTest_testMaxDelaySec";
 
-        TimerMessageStore first = createTimerMessageStore(null);
+        TimerMessageStore first = createTimerMessageStore(null , false);
         first.load();
         first.start(true);
 
@@ -468,7 +521,7 @@ public class TimerMessageStoreTest {
         storeConfig.setTimerRollWindowSlot(2);
         String topic = "TimerTest_testRollMessage";
 
-        TimerMessageStore timerMessageStore = createTimerMessageStore(null);
+        TimerMessageStore timerMessageStore = createTimerMessageStore(null , 
false);
         timerMessageStore.load();
         timerMessageStore.start(true);
 

Reply via email to