This is an automated email from the ASF dual-hosted git repository. jinrongtong pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new d1fd7af3f1 [ISSUE #8979] Add configurable switch for timer message retry logic (#8980) d1fd7af3f1 is described below commit d1fd7af3f12e1bfa30c7baa7c3f687168a9f5dbf Author: 小陈 <jamiechen....@gmail.com> AuthorDate: Thu Dec 5 17:49:03 2024 +0800 [ISSUE #8979] Add configurable switch for timer message retry logic (#8980) --- .../rocketmq/store/config/MessageStoreConfig.java | 9 ++ .../rocketmq/store/timer/TimerMessageStore.java | 121 ++++++++++++--------- .../store/timer/TimerMessageStoreTest.java | 87 ++++++++++++--- 3 files changed, 149 insertions(+), 68 deletions(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java index 6dfdc0b1c8..0ea5841548 100644 --- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java +++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java @@ -99,6 +99,7 @@ public class MessageStoreConfig { private boolean timerSkipUnknownError = false; private boolean timerWarmEnable = false; private boolean timerStopDequeue = false; + private boolean timerEnableRetryUntilSuccess = false; private int timerCongestNumEachSlot = Integer.MAX_VALUE; private int timerMetricSmallThreshold = 1000000; @@ -1689,6 +1690,14 @@ public class MessageStoreConfig { this.timerSkipUnknownError = timerSkipUnknownError; } + public boolean isTimerEnableRetryUntilSuccess() { + return timerEnableRetryUntilSuccess; + } + + public void setTimerEnableRetryUntilSuccess(boolean timerEnableRetryUntilSuccess) { + this.timerEnableRetryUntilSuccess = timerEnableRetryUntilSuccess; + } + public boolean isTimerWarmEnable() { return timerWarmEnable; } diff --git a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java index fb166678e6..2b14618eed 100644 --- a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java @@ -1097,46 +1097,44 @@ public class TimerMessageStore { putMessageResult = messageStore.putMessage(message); } - int retryNum = 0; - while (retryNum < 3) { - if (null == putMessageResult || null == putMessageResult.getPutMessageStatus()) { - retryNum++; - } else { - switch (putMessageResult.getPutMessageStatus()) { - case PUT_OK: - if (brokerStatsManager != null) { - this.brokerStatsManager.incTopicPutNums(message.getTopic(), 1, 1); - if (putMessageResult.getAppendMessageResult() != null) { - this.brokerStatsManager.incTopicPutSize(message.getTopic(), - putMessageResult.getAppendMessageResult().getWroteBytes()); - } - this.brokerStatsManager.incBrokerPutNums(message.getTopic(), 1); + if (putMessageResult != null && putMessageResult.getPutMessageStatus() != null) { + switch (putMessageResult.getPutMessageStatus()) { + case PUT_OK: + if (brokerStatsManager != null) { + brokerStatsManager.incTopicPutNums(message.getTopic(), 1, 1); + if (putMessageResult.getAppendMessageResult() != null) { + brokerStatsManager.incTopicPutSize(message.getTopic(), putMessageResult.getAppendMessageResult().getWroteBytes()); } - return PUT_OK; - case SERVICE_NOT_AVAILABLE: - return PUT_NEED_RETRY; - case MESSAGE_ILLEGAL: - case PROPERTIES_SIZE_EXCEEDED: + brokerStatsManager.incBrokerPutNums(message.getTopic(), 1); + } + return PUT_OK; + + case MESSAGE_ILLEGAL: + case PROPERTIES_SIZE_EXCEEDED: + case WHEEL_TIMER_NOT_ENABLE: + case WHEEL_TIMER_MSG_ILLEGAL: + return PUT_NO_RETRY; + + case SERVICE_NOT_AVAILABLE: + case FLUSH_DISK_TIMEOUT: + case FLUSH_SLAVE_TIMEOUT: + case OS_PAGE_CACHE_BUSY: + case CREATE_MAPPED_FILE_FAILED: + case SLAVE_NOT_AVAILABLE: + return PUT_NEED_RETRY; + + case UNKNOWN_ERROR: + default: + if (storeConfig.isTimerSkipUnknownError()) { + LOGGER.warn("Skipping message due to unknown error, msg: {}", message); return PUT_NO_RETRY; - case CREATE_MAPPED_FILE_FAILED: - case FLUSH_DISK_TIMEOUT: - case FLUSH_SLAVE_TIMEOUT: - case OS_PAGE_CACHE_BUSY: - case SLAVE_NOT_AVAILABLE: - case UNKNOWN_ERROR: - default: - retryNum++; - } - } - Thread.sleep(50); - if (escapeBridgeHook != null) { - putMessageResult = escapeBridgeHook.apply(message); - } else { - putMessageResult = messageStore.putMessage(message); + } else { + holdMomentForUnknownError(); + return PUT_NEED_RETRY; + } } - LOGGER.warn("Retrying to do put timer msg retryNum:{} putRes:{} msg:{}", retryNum, putMessageResult, message); } - return PUT_NO_RETRY; + return PUT_NEED_RETRY; } public MessageExtBrokerInner convertMessage(MessageExt msgExt, boolean needRoll) { @@ -1471,7 +1469,6 @@ public class TimerMessageStore { } public class TimerDequeuePutMessageService extends AbstractStateService { - @Override public String getServiceName() { return getServiceThreadName() + this.getClass().getSimpleName(); @@ -1481,6 +1478,7 @@ public class TimerMessageStore { public void run() { setState(AbstractStateService.START); TimerMessageStore.LOGGER.info(this.getServiceName() + " service start"); + while (!this.isStopped() || dequeuePutQueue.size() != 0) { try { setState(AbstractStateService.WAITING); @@ -1488,41 +1486,63 @@ public class TimerMessageStore { if (null == tr) { continue; } + setState(AbstractStateService.RUNNING); - boolean doRes = false; boolean tmpDequeueChangeFlag = false; + try { - while (!isStopped() && !doRes) { + while (!isStopped()) { if (!isRunningDequeue()) { dequeueStatusChangeFlag = true; tmpDequeueChangeFlag = true; break; } + try { perfCounterTicks.startTick(DEQUEUE_PUT); + MessageExt msgExt = tr.getMsg(); DefaultStoreMetricsManager.incTimerDequeueCount(getRealTopic(msgExt)); + if (tr.getEnqueueTime() == Long.MAX_VALUE) { - // never enqueue, mark it. + // Never enqueue, mark it. MessageAccessor.putProperty(msgExt, TIMER_ENQUEUE_MS, String.valueOf(Long.MAX_VALUE)); } + addMetric(msgExt, -1); MessageExtBrokerInner msg = convert(msgExt, tr.getEnqueueTime(), needRoll(tr.getMagic())); - doRes = PUT_NEED_RETRY != doPut(msg, needRoll(tr.getMagic())); - while (!doRes && !isStopped()) { - if (!isRunningDequeue()) { - dequeueStatusChangeFlag = true; - tmpDequeueChangeFlag = true; - break; + + boolean processed = false; + int retryCount = 0; + + while (!processed && !isStopped()) { + int result = doPut(msg, needRoll(tr.getMagic())); + + if (result == PUT_OK) { + processed = true; + } else if (result == PUT_NO_RETRY) { + TimerMessageStore.LOGGER.warn("Skipping message due to unrecoverable error. Msg: {}", msg); + processed = true; + } else { + retryCount++; + // Without enabling TimerEnableRetryUntilSuccess, messages will retry up to 3 times before being discarded + if (!storeConfig.isTimerEnableRetryUntilSuccess() && retryCount >= 3) { + TimerMessageStore.LOGGER.error("Message processing failed after {} retries. Msg: {}", retryCount, msg); + processed = true; + } else { + Thread.sleep(500L * precisionMs / 1000); + TimerMessageStore.LOGGER.warn("Retrying to process message. Retry count: {}, Msg: {}", retryCount, msg); + } } - doRes = PUT_NEED_RETRY != doPut(msg, needRoll(tr.getMagic())); - Thread.sleep(500L * precisionMs / 1000); } + perfCounterTicks.endTick(DEQUEUE_PUT); + break; + } catch (Throwable t) { - LOGGER.info("Unknown error", t); + TimerMessageStore.LOGGER.info("Unknown error", t); if (storeConfig.isTimerSkipUnknownError()) { - doRes = true; + break; } else { holdMomentForUnknownError(); } @@ -1531,7 +1551,6 @@ public class TimerMessageStore { } finally { tr.idempotentRelease(!tmpDequeueChangeFlag); } - } catch (Throwable e) { TimerMessageStore.LOGGER.error("Error occurred in " + getServiceName(), e); } diff --git a/store/src/test/java/org/apache/rocketmq/store/timer/TimerMessageStoreTest.java b/store/src/test/java/org/apache/rocketmq/store/timer/TimerMessageStoreTest.java index 4ce3985f6c..52e58efde2 100644 --- a/store/src/test/java/org/apache/rocketmq/store/timer/TimerMessageStoreTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/timer/TimerMessageStoreTest.java @@ -19,6 +19,7 @@ package org.apache.rocketmq.store.timer; import java.io.File; import java.io.IOException; +import java.lang.reflect.Field; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketAddress; @@ -30,6 +31,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.apache.rocketmq.common.BrokerConfig; @@ -40,23 +42,26 @@ import org.apache.rocketmq.common.message.MessageClientIDSetter; import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; -import org.apache.rocketmq.store.ConsumeQueue; -import org.apache.rocketmq.store.DefaultMessageStore; -import org.apache.rocketmq.store.GetMessageResult; -import org.apache.rocketmq.store.GetMessageStatus; -import org.apache.rocketmq.store.MessageArrivingListener; import org.apache.rocketmq.common.message.MessageExtBrokerInner; import org.apache.rocketmq.store.MessageStore; import org.apache.rocketmq.store.PutMessageResult; +import org.apache.rocketmq.store.DefaultMessageStore; import org.apache.rocketmq.store.PutMessageStatus; import org.apache.rocketmq.store.config.FlushDiskType; import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.stats.BrokerStatsManager; +import org.apache.rocketmq.store.ConsumeQueue; +import org.apache.rocketmq.store.AppendMessageStatus; +import org.apache.rocketmq.store.AppendMessageResult; +import org.apache.rocketmq.store.GetMessageResult; +import org.apache.rocketmq.store.GetMessageStatus; +import org.apache.rocketmq.store.MessageArrivingListener; import org.junit.After; import org.junit.Assert; import org.junit.Assume; import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; @@ -65,10 +70,16 @@ import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.times; public class TimerMessageStoreTest { private final byte[] msgBody = new byte[1024]; private static MessageStore messageStore; + private MessageStore mockMessageStore; private SocketAddress bornHost; private SocketAddress storeHost; @@ -100,21 +111,23 @@ public class TimerMessageStoreTest { storeConfig.setTimerInterceptDelayLevel(true); storeConfig.setTimerPrecisionMs(precisionMs); + mockMessageStore = Mockito.mock(MessageStore.class); messageStore = new DefaultMessageStore(storeConfig, new BrokerStatsManager("TimerTest",false), new MyMessageArrivingListener(), new BrokerConfig(), new ConcurrentHashMap<>()); boolean load = messageStore.load(); assertTrue(load); messageStore.start(); } - public TimerMessageStore createTimerMessageStore(String rootDir) throws IOException { + public TimerMessageStore createTimerMessageStore(String rootDir , boolean needMock) throws IOException { if (null == rootDir) { rootDir = StoreTestUtils.createBaseDir(); } TimerCheckpoint timerCheckpoint = new TimerCheckpoint(rootDir + File.separator + "config" + File.separator + "timercheck"); TimerMetrics timerMetrics = new TimerMetrics(rootDir + File.separator + "config" + File.separator + "timermetrics"); - TimerMessageStore timerMessageStore = new TimerMessageStore(messageStore, storeConfig, timerCheckpoint, timerMetrics, null); - messageStore.setTimerMessageStore(timerMessageStore); + MessageStore ms = needMock ? mockMessageStore : messageStore; + TimerMessageStore timerMessageStore = new TimerMessageStore(ms, storeConfig, timerCheckpoint, timerMetrics, null); + ms.setTimerMessageStore(timerMessageStore); baseDirs.add(rootDir); timerStores.add(timerMessageStore); @@ -170,7 +183,7 @@ public class TimerMessageStoreTest { Assume.assumeFalse(MixAll.isWindows()); String topic = "TimerTest_testPutTimerMessage"; - final TimerMessageStore timerMessageStore = createTimerMessageStore(null); + final TimerMessageStore timerMessageStore = createTimerMessageStore(null , false); timerMessageStore.load(); timerMessageStore.start(true); @@ -212,12 +225,52 @@ public class TimerMessageStoreTest { } } + @Test + public void testRetryUntilSuccess() throws Exception { + storeConfig.setTimerEnableRetryUntilSuccess(true); + TimerMessageStore timerMessageStore = createTimerMessageStore(null , true); + timerMessageStore.load(); + timerMessageStore.setShouldRunningDequeue(true); + Field stateField = TimerMessageStore.class.getDeclaredField("state"); + stateField.setAccessible(true); + stateField.set(timerMessageStore, TimerMessageStore.RUNNING); + + MessageExtBrokerInner msg = buildMessage(3000L, "TestRetry", true); + transformTimerMessage(timerMessageStore, msg); + TimerRequest timerRequest = new TimerRequest(100, 200, 3000, System.currentTimeMillis(), 0, msg); + boolean offered = timerMessageStore.dequeuePutQueue.offer(timerRequest); + assertTrue(offered); + assertFalse(timerMessageStore.dequeuePutQueue.isEmpty()); + + // If enableRetryUntilSuccess is set and putMessage return NEED_RETRY type, the message should be retried until success. + when(mockMessageStore.putMessage(any(MessageExtBrokerInner.class))) + .thenReturn(new PutMessageResult(PutMessageStatus.FLUSH_DISK_TIMEOUT, null)) + .thenReturn(new PutMessageResult(PutMessageStatus.FLUSH_SLAVE_TIMEOUT, null)) + .thenReturn(new PutMessageResult(PutMessageStatus.OS_PAGE_CACHE_BUSY, null)) + .thenReturn(new PutMessageResult(PutMessageStatus.OS_PAGE_CACHE_BUSY, null)) + .thenReturn(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, null)) + .thenReturn(new PutMessageResult(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK))); + + final CountDownLatch latch = new CountDownLatch(1); + new Thread(() -> { + try { + timerMessageStore.getDequeuePutMessageServices()[0].run(); + } finally { + latch.countDown(); + } + }).start(); + latch.await(10, TimeUnit.SECONDS); + + assertTrue(timerMessageStore.dequeuePutQueue.isEmpty()); + verify(mockMessageStore, times(6)).putMessage(any(MessageExtBrokerInner.class)); + } + @Test public void testTimerFlowControl() throws Exception { String topic = "TimerTest_testTimerFlowControl"; storeConfig.setTimerCongestNumEachSlot(100); - TimerMessageStore timerMessageStore = createTimerMessageStore(null); + TimerMessageStore timerMessageStore = createTimerMessageStore(null , false); timerMessageStore.load(); timerMessageStore.start(true); @@ -264,7 +317,7 @@ public class TimerMessageStoreTest { String topic = "TimerTest_testPutExpiredTimerMessage"; - TimerMessageStore timerMessageStore = createTimerMessageStore(null); + TimerMessageStore timerMessageStore = createTimerMessageStore(null ,false); timerMessageStore.load(); timerMessageStore.start(true); @@ -288,7 +341,7 @@ public class TimerMessageStoreTest { public void testDeleteTimerMessage() throws Exception { String topic = "TimerTest_testDeleteTimerMessage"; - TimerMessageStore timerMessageStore = createTimerMessageStore(null); + TimerMessageStore timerMessageStore = createTimerMessageStore(null ,false); timerMessageStore.load(); timerMessageStore.start(true); @@ -325,7 +378,7 @@ public class TimerMessageStoreTest { public void testPutDeleteTimerMessage() throws Exception { String topic = "TimerTest_testPutDeleteTimerMessage"; - final TimerMessageStore timerMessageStore = createTimerMessageStore(null); + final TimerMessageStore timerMessageStore = createTimerMessageStore(null , false); timerMessageStore.load(); timerMessageStore.start(true); @@ -372,7 +425,7 @@ public class TimerMessageStoreTest { final String topic = "TimerTest_testStateAndRecover"; String base = StoreTestUtils.createBaseDir(); - final TimerMessageStore first = createTimerMessageStore(base); + final TimerMessageStore first = createTimerMessageStore(base , false); first.load(); first.start(true); @@ -417,7 +470,7 @@ public class TimerMessageStoreTest { first.getTimerWheel().flush(); first.shutdown(); - final TimerMessageStore second = createTimerMessageStore(base); + final TimerMessageStore second = createTimerMessageStore(base , false); second.debug = true; assertTrue(second.load()); assertEquals(msgNum, second.getQueueOffset()); @@ -446,7 +499,7 @@ public class TimerMessageStoreTest { public void testMaxDelaySec() throws Exception { String topic = "TimerTest_testMaxDelaySec"; - TimerMessageStore first = createTimerMessageStore(null); + TimerMessageStore first = createTimerMessageStore(null , false); first.load(); first.start(true); @@ -468,7 +521,7 @@ public class TimerMessageStoreTest { storeConfig.setTimerRollWindowSlot(2); String topic = "TimerTest_testRollMessage"; - TimerMessageStore timerMessageStore = createTimerMessageStore(null); + TimerMessageStore timerMessageStore = createTimerMessageStore(null , false); timerMessageStore.load(); timerMessageStore.start(true);