This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new d66cfa9d7a [ISSUE #10165] Prevent RocksDB metrics from being 
overwritten after timer engine switch (#10166)
d66cfa9d7a is described below

commit d66cfa9d7a448521f8d86964fa861b605f5c1cc3
Author: hqbfz <[email protected]>
AuthorDate: Mon Mar 30 16:48:33 2026 +0800

    [ISSUE #10165] Prevent RocksDB metrics from being overwritten after timer 
engine switch (#10166)
---
 .../rocketmq/store/timer/TimerMessageStore.java    |   3 +
 .../store/timer/TimerEngineSwitchVerifyTest.java   | 679 +++++++++++++++++++++
 2 files changed, 682 insertions(+)

diff --git 
a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java 
b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
index 390dec9f98..157f237f7b 100644
--- a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
@@ -526,6 +526,9 @@ public class TimerMessageStore {
             public void run() {
                 try {
                     if (storeConfig.isTimerEnableCheckMetrics()) {
+                        if (storeConfig.isTimerStopEnqueue()) {
+                            return;
+                        }
                         String when = storeConfig.getTimerCheckMetricsWhen();
                         if (!UtilAll.isItTimeToDo(when)) {
                             return;
diff --git 
a/store/src/test/java/org/apache/rocketmq/store/timer/TimerEngineSwitchVerifyTest.java
 
b/store/src/test/java/org/apache/rocketmq/store/timer/TimerEngineSwitchVerifyTest.java
new file mode 100644
index 0000000000..9429af4712
--- /dev/null
+++ 
b/store/src/test/java/org/apache/rocketmq/store/timer/TimerEngineSwitchVerifyTest.java
@@ -0,0 +1,679 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.timer;
+
+import java.io.File;
+import java.lang.reflect.Field;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.TopicFilterType;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.message.MessageAccessor;
+import org.apache.rocketmq.common.message.MessageClientIDSetter;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageExtBrokerInner;
+import org.apache.rocketmq.store.DefaultMessageStore;
+import org.apache.rocketmq.store.MessageArrivingListener;
+import org.apache.rocketmq.store.MessageStore;
+import org.apache.rocketmq.store.PutMessageResult;
+import org.apache.rocketmq.store.PutMessageStatus;
+import org.apache.rocketmq.store.config.FlushDiskType;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.apache.rocketmq.store.stats.BrokerStatsManager;
+import org.junit.After;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.awaitility.Awaitility.await;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Fix verification test: verifies that after adding the timerStopEnqueue 
guard in the scheduler task,
+ * checkAndReviseMetrics does not incorrectly overwrite RocksDB metrics during 
engine switching.
+ *
+ * Fix approach:
+ * In the checkAndReviseMetrics scheduled task registered in 
TimerMessageStore.start(),
+ * add a storeConfig.isTimerStopEnqueue() check: when the file-based engine 
has stopped enqueuing
+ * (indicating a switch to RocksDB), skip checkAndReviseMetrics to avoid 
traversing only timerLog
+ * and overwriting RocksDB-side metrics via putAll.
+ *
+ * This test class covers the following scenarios:
+ * 1. File-based mode: checkAndReviseMetrics works normally
+ * 2. After switching to RocksDB: RocksDB-only topic metrics are not 
overwritten
+ * 3. After switching to RocksDB: shared topic metrics are not overwritten
+ * 4. After switching back to file-based mode: checkAndReviseMetrics resumes 
normally
+ * 5. When scheduler auto-triggers: timerStopEnqueue=true skips 
checkAndReviseMetrics
+ * 6. Repeated engine switches: metrics consistency is always maintained
+ */
+public class TimerEngineSwitchVerifyTest {
+
+    private final byte[] msgBody = new byte[1024];
+    private MessageStore messageStore;
+    private SocketAddress bornHost;
+    private SocketAddress storeHost;
+    private final int precisionMs = 500;
+    private final Set<String> baseDirs = new HashSet<>();
+    private final List<TimerMessageStore> timerStores = new ArrayList<>();
+    private final AtomicInteger counter = new AtomicInteger(0);
+    private MessageStoreConfig storeConfig;
+
+    @Before
+    public void init() throws Exception {
+        String baseDir = StoreTestUtils.createBaseDir();
+        baseDirs.add(baseDir);
+
+        storeHost = new InetSocketAddress(InetAddress.getLocalHost(), 8123);
+        bornHost = new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 
0);
+
+        storeConfig = new MessageStoreConfig();
+        storeConfig.setMappedFileSizeCommitLog(1024 * 1024 * 1024);
+        storeConfig.setMappedFileSizeTimerLog(1024 * 1024 * 1024);
+        storeConfig.setMappedFileSizeConsumeQueue(10240);
+        storeConfig.setMaxHashSlotNum(10000);
+        storeConfig.setMaxIndexNum(100 * 1000);
+        storeConfig.setStorePathRootDir(baseDir);
+        storeConfig.setStorePathCommitLog(baseDir + File.separator + 
"commitlog");
+        storeConfig.setFlushDiskType(FlushDiskType.ASYNC_FLUSH);
+        storeConfig.setTimerInterceptDelayLevel(true);
+        storeConfig.setTimerPrecisionMs(precisionMs);
+        storeConfig.setAppendTopicForTimerDeleteKey(false);
+        storeConfig.setTimerMetricSmallThreshold(1000000);
+
+        messageStore = new DefaultMessageStore(storeConfig, new 
BrokerStatsManager("TimerFixTest", false),
+                new MyMessageArrivingListener(), new BrokerConfig(), new 
ConcurrentHashMap<>());
+        boolean load = messageStore.load();
+        assertTrue(load);
+        messageStore.start();
+    }
+
+    private TimerMessageStore createTimerMessageStore(String rootDir) throws 
Exception {
+        if (null == rootDir) {
+            rootDir = StoreTestUtils.createBaseDir();
+        }
+        TimerCheckpoint timerCheckpoint = new TimerCheckpoint(
+                rootDir + File.separator + "config" + File.separator + 
"timercheck");
+        TimerMetrics timerMetrics = new TimerMetrics(
+                rootDir + File.separator + "config" + File.separator + 
"timermetrics");
+        TimerMessageStore timerMessageStore = new TimerMessageStore(
+                messageStore, storeConfig, timerCheckpoint, timerMetrics, 
null);
+        messageStore.setTimerMessageStore(timerMessageStore);
+
+        baseDirs.add(rootDir);
+        timerStores.add(timerMessageStore);
+        return timerMessageStore;
+    }
+
+    /**
+     * Scenario 1: In file-based mode (timerStopEnqueue=false), 
checkAndReviseMetrics works normally.
+     * Ensures the fix does not affect the original metrics correction 
capability of the file-based engine.
+     */
+    @Test
+    public void testFileMode_checkAndReviseMetrics_worksNormally() throws 
Exception {
+        Assume.assumeFalse(MixAll.isWindows());
+
+        final String topic = "FixVerify_FileMode_" + 
System.currentTimeMillis();
+        final int msgCount = 4;
+
+        final TimerMessageStore timerMessageStore = 
createTimerMessageStore(null);
+        timerMessageStore.load();
+        timerMessageStore.start(true);
+
+        // Ensure file-based mode
+        storeConfig.setTimerStopEnqueue(false);
+        storeConfig.setTimerRocksDBEnable(false);
+
+        // Write timer messages to timerLog
+        long delayMs = System.currentTimeMillis() / precisionMs * precisionMs 
+ 60000;
+        for (int i = 0; i < msgCount; i++) {
+            MessageExtBrokerInner inner = buildMessage(delayMs, topic, false);
+            transformTimerMessage(timerMessageStore, inner, storeConfig);
+            assertEquals(PutMessageStatus.PUT_OK, 
messageStore.putMessage(inner).getPutMessageStatus());
+        }
+
+        await().atMost(5000, TimeUnit.MILLISECONDS).until(new 
Callable<Boolean>() {
+            @Override
+            public Boolean call() {
+                return timerMessageStore.getCommitQueueOffset() >= msgCount;
+            }
+        });
+
+        TimerMetrics timerMetrics = timerMessageStore.getTimerMetrics();
+        assertEquals(msgCount, timerMetrics.getTimingCount(topic));
+
+        // Execute checkAndReviseMetrics -- should work normally in file-based 
mode
+        timerMessageStore.checkAndReviseMetrics();
+
+        // Verify: file-based metrics should remain correct (re-counted from 
timerLog)
+        assertEquals("checkAndReviseMetrics should correctly revise metrics in 
file-based mode",
+                msgCount, timerMetrics.getTimingCount(topic));
+    }
+
+    /**
+     * Scenario 2: After switching to RocksDB, RocksDB-only topic metrics are 
not overwritten.
+     *
+     * Verifies the fix: when timerStopEnqueue=true in the scheduler task, 
checkAndReviseMetrics is skipped,
+     * so RocksDB-only topic metrics are not overwritten to 0 by 
putAll(newSmallOnes).
+     */
+    @Test
+    public void testSwitchToRocksDB_rocksDBOnlyTopicPreserved() throws 
Exception {
+        Assume.assumeFalse(MixAll.isWindows());
+
+        final String fileTopic = "FixVerify_FileTopic_" + 
System.currentTimeMillis();
+        final String rocksdbTopic = "FixVerify_RocksDBTopic_" + 
System.currentTimeMillis();
+        final int fileMsgCount = 3;
+        final int rocksdbMsgCount = 6;
+
+        final TimerMessageStore timerMessageStore = 
createTimerMessageStore(null);
+        timerMessageStore.load();
+        timerMessageStore.start(true);
+
+        // Phase 1: Write messages in file-based mode
+        long delayMs = System.currentTimeMillis() / precisionMs * precisionMs 
+ 60000;
+        for (int i = 0; i < fileMsgCount; i++) {
+            MessageExtBrokerInner inner = buildMessage(delayMs, fileTopic, 
false);
+            transformTimerMessage(timerMessageStore, inner, storeConfig);
+            assertEquals(PutMessageStatus.PUT_OK, 
messageStore.putMessage(inner).getPutMessageStatus());
+        }
+
+        await().atMost(5000, TimeUnit.MILLISECONDS).until(new 
Callable<Boolean>() {
+            @Override
+            public Boolean call() {
+                return timerMessageStore.getCommitQueueOffset() >= 
fileMsgCount;
+            }
+        });
+
+        TimerMetrics timerMetrics = timerMessageStore.getTimerMetrics();
+        assertEquals(fileMsgCount, timerMetrics.getTimingCount(fileTopic));
+
+        // Phase 2: Simulate switchTimerEngine to RocksDB
+        storeConfig.setTimerStopEnqueue(true);
+        storeConfig.setTimerRocksDBEnable(true);
+
+        // Phase 3: Write new topic metrics from RocksDB side
+        for (int i = 0; i < rocksdbMsgCount; i++) {
+            MessageExt mockMsg = new MessageExt();
+            MessageAccessor.putProperty(mockMsg, 
MessageConst.PROPERTY_REAL_TOPIC, rocksdbTopic);
+            timerMetrics.addAndGet(mockMsg, 1);
+        }
+
+        assertEquals(fileMsgCount, timerMetrics.getTimingCount(fileTopic));
+        assertEquals(rocksdbMsgCount, 
timerMetrics.getTimingCount(rocksdbTopic));
+
+        // Phase 4: Simulate the fixed scheduler logic
+        // The fix adds a timerStopEnqueue check in the scheduler, preventing 
checkAndReviseMetrics from being called
+        boolean skipped = false;
+        if (storeConfig.isTimerStopEnqueue()) {
+            skipped = true;
+            // Do not call checkAndReviseMetrics
+        } else {
+            timerMessageStore.checkAndReviseMetrics();
+        }
+
+        // Phase 5: Verify the fix
+        assertTrue("Should skip checkAndReviseMetrics when 
timerStopEnqueue=true", skipped);
+        assertEquals("RocksDB-only topic metrics should not be overwritten",
+                rocksdbMsgCount, timerMetrics.getTimingCount(rocksdbTopic));
+        assertEquals("File-based topic metrics should remain unchanged",
+                fileMsgCount, timerMetrics.getTimingCount(fileTopic));
+    }
+
+    /**
+     * Scenario 3: After switching to RocksDB, shared topic (messages in both 
file-based and RocksDB) metrics are not overwritten.
+     *
+     * Before fix: checkAndReviseMetrics only counts file-based quantities 
from timerLog, putAll loses the RocksDB portion.
+     * After fix: checkAndReviseMetrics is skipped, all metrics remain 
unchanged.
+     */
+    @Test
+    public void testSwitchToRocksDB_sharedTopicPreserved() throws Exception {
+        Assume.assumeFalse(MixAll.isWindows());
+
+        final String sharedTopic = "FixVerify_SharedTopic_" + 
System.currentTimeMillis();
+        final int fileMsgCount = 2;
+        final int rocksdbMsgCount = 4;
+
+        final TimerMessageStore timerMessageStore = 
createTimerMessageStore(null);
+        timerMessageStore.load();
+        timerMessageStore.start(true);
+
+        // Phase 1: Write in file-based mode
+        long delayMs = System.currentTimeMillis() / precisionMs * precisionMs 
+ 60000;
+        for (int i = 0; i < fileMsgCount; i++) {
+            MessageExtBrokerInner inner = buildMessage(delayMs, sharedTopic, 
false);
+            transformTimerMessage(timerMessageStore, inner, storeConfig);
+            assertEquals(PutMessageStatus.PUT_OK, 
messageStore.putMessage(inner).getPutMessageStatus());
+        }
+
+        await().atMost(5000, TimeUnit.MILLISECONDS).until(new 
Callable<Boolean>() {
+            @Override
+            public Boolean call() {
+                return timerMessageStore.getCommitQueueOffset() >= 
fileMsgCount;
+            }
+        });
+
+        TimerMetrics timerMetrics = timerMessageStore.getTimerMetrics();
+        assertEquals(fileMsgCount, timerMetrics.getTimingCount(sharedTopic));
+
+        // Phase 2: Switch to RocksDB
+        storeConfig.setTimerStopEnqueue(true);
+        storeConfig.setTimerRocksDBEnable(true);
+
+        // Phase 3: RocksDB continues to increment the count for the same topic
+        for (int i = 0; i < rocksdbMsgCount; i++) {
+            MessageExt mockMsg = new MessageExt();
+            MessageAccessor.putProperty(mockMsg, 
MessageConst.PROPERTY_REAL_TOPIC, sharedTopic);
+            timerMetrics.addAndGet(mockMsg, 1);
+        }
+
+        long totalBefore = timerMetrics.getTimingCount(sharedTopic);
+        assertEquals(fileMsgCount + rocksdbMsgCount, totalBefore);
+
+        // Phase 4: Simulate the fixed scheduler
+        if (storeConfig.isTimerStopEnqueue()) {
+            // Skip
+        } else {
+            timerMessageStore.checkAndReviseMetrics();
+        }
+
+        // Phase 5: Verify
+        long totalAfter = timerMetrics.getTimingCount(sharedTopic);
+        assertEquals("Shared topic metrics should not be overwritten 
(file-based " + fileMsgCount + " + RocksDB " + rocksdbMsgCount + ")",
+                fileMsgCount + rocksdbMsgCount, totalAfter);
+    }
+
+    /**
+     * Scenario 4: After switching back from RocksDB to file-based mode, 
checkAndReviseMetrics should resume normally.
+     *
+     * Simulated flow:
+     * 1. File-based write -> switch to RocksDB -> RocksDB writes metrics
+     * 2. Switch back to file-based mode (timerStopEnqueue=false)
+     * 3. checkAndReviseMetrics resumes execution, normally revising metrics 
from timerLog
+     */
+    @Test
+    public void 
testSwitchBackToFileMode_checkAndReviseMetrics_resumesNormally() throws 
Exception {
+        Assume.assumeFalse(MixAll.isWindows());
+
+        final String fileTopic = "FixVerify_SwitchBack_" + 
System.currentTimeMillis();
+        final int fileMsgCount = 5;
+
+        final TimerMessageStore timerMessageStore = 
createTimerMessageStore(null);
+        timerMessageStore.load();
+        timerMessageStore.start(true);
+
+        // Phase 1: Write messages in file-based mode
+        long delayMs = System.currentTimeMillis() / precisionMs * precisionMs 
+ 60000;
+        for (int i = 0; i < fileMsgCount; i++) {
+            MessageExtBrokerInner inner = buildMessage(delayMs, fileTopic, 
false);
+            transformTimerMessage(timerMessageStore, inner, storeConfig);
+            assertEquals(PutMessageStatus.PUT_OK, 
messageStore.putMessage(inner).getPutMessageStatus());
+        }
+
+        await().atMost(5000, TimeUnit.MILLISECONDS).until(new 
Callable<Boolean>() {
+            @Override
+            public Boolean call() {
+                return timerMessageStore.getCommitQueueOffset() >= 
fileMsgCount;
+            }
+        });
+
+        TimerMetrics timerMetrics = timerMessageStore.getTimerMetrics();
+        assertEquals(fileMsgCount, timerMetrics.getTimingCount(fileTopic));
+
+        // Phase 2: Switch to RocksDB
+        storeConfig.setTimerStopEnqueue(true);
+        storeConfig.setTimerRocksDBEnable(true);
+
+        // Simulate the fixed scheduler -- should skip
+        boolean skippedInRocksDB = storeConfig.isTimerStopEnqueue();
+        assertTrue("Should skip checkAndReviseMetrics in RocksDB mode", 
skippedInRocksDB);
+
+        // Phase 3: Switch back to file-based mode (simulating 
switchTimerEngine(FILE_TIMELINE))
+        storeConfig.setTimerStopEnqueue(false);
+        storeConfig.setTimerRocksDBEnable(false);
+
+        // Phase 4: After switching back, checkAndReviseMetrics resumes 
execution
+        boolean skippedInFileMode = storeConfig.isTimerStopEnqueue();
+
+        if (!skippedInFileMode) {
+            timerMessageStore.checkAndReviseMetrics();
+        }
+
+        // Phase 5: Verify metrics are correct after switching back
+        assertEquals("After switching back to file-based mode, 
checkAndReviseMetrics should correctly revise metrics from timerLog",
+                fileMsgCount, timerMetrics.getTimingCount(fileTopic));
+    }
+
+    /**
+     * Scenario 5: When the scheduler auto-triggers, the fixed 
timerStopEnqueue guard correctly skips checkAndReviseMetrics.
+     *
+     * Uses reflection to obtain the scheduler and registers a short-interval 
task (logic identical to the fixed start()),
+     * verifying that when timerStopEnqueue=true the scheduler correctly skips 
and RocksDB metrics are not overwritten.
+     */
+    @Test
+    public void testSchedulerAutoTrigger_skipsWhenSwitchedToRocksDB() throws 
Exception {
+        Assume.assumeFalse(MixAll.isWindows());
+
+        final String fileTopic = "FixVerify_Sched_File_" + 
System.currentTimeMillis();
+        final String rocksdbTopic = "FixVerify_Sched_RocksDB_" + 
System.currentTimeMillis();
+        final int fileMsgCount = 3;
+        final int rocksdbMsgCount = 8;
+
+        final TimerMessageStore timerMessageStore = 
createTimerMessageStore(null);
+        timerMessageStore.load();
+        timerMessageStore.start(true);
+
+        // Phase 1: Write in file-based mode
+        long delayMs = System.currentTimeMillis() / precisionMs * precisionMs 
+ 60000;
+        for (int i = 0; i < fileMsgCount; i++) {
+            MessageExtBrokerInner inner = buildMessage(delayMs, fileTopic, 
false);
+            transformTimerMessage(timerMessageStore, inner, storeConfig);
+            assertEquals(PutMessageStatus.PUT_OK, 
messageStore.putMessage(inner).getPutMessageStatus());
+        }
+
+        await().atMost(5000, TimeUnit.MILLISECONDS).until(new 
Callable<Boolean>() {
+            @Override
+            public Boolean call() {
+                return timerMessageStore.getCommitQueueOffset() >= 
fileMsgCount;
+            }
+        });
+
+        final TimerMetrics timerMetrics = timerMessageStore.getTimerMetrics();
+        assertEquals(fileMsgCount, timerMetrics.getTimingCount(fileTopic));
+
+        // Phase 2: Switch to RocksDB
+        storeConfig.setTimerStopEnqueue(true);
+        storeConfig.setTimerRocksDBEnable(true);
+
+        // Phase 3: Write metrics from RocksDB side
+        for (int i = 0; i < rocksdbMsgCount; i++) {
+            MessageExt mockMsg = new MessageExt();
+            MessageAccessor.putProperty(mockMsg, 
MessageConst.PROPERTY_REAL_TOPIC, rocksdbTopic);
+            timerMetrics.addAndGet(mockMsg, 1);
+        }
+
+        assertEquals(fileMsgCount, timerMetrics.getTimingCount(fileTopic));
+        assertEquals(rocksdbMsgCount, 
timerMetrics.getTimingCount(rocksdbTopic));
+
+        // Phase 4: Set conditions to trigger the scheduled task
+        Calendar now = Calendar.getInstance();
+        String currentHour = String.format("%02d", 
now.get(Calendar.HOUR_OF_DAY));
+        Field whenField = 
MessageStoreConfig.class.getDeclaredField("timerCheckMetricsWhen");
+        whenField.setAccessible(true);
+        whenField.set(storeConfig, currentHour);
+        timerMessageStore.lastTimeOfCheckMetrics = 0L;
+        storeConfig.setTimerEnableCheckMetrics(true);
+
+        // Phase 5: Obtain scheduler via reflection and register a 
short-interval task (logic identical to the fixed start())
+        Field schedulerField = 
TimerMessageStore.class.getDeclaredField("scheduler");
+        schedulerField.setAccessible(true);
+        ScheduledExecutorService scheduler = (ScheduledExecutorService) 
schedulerField.get(timerMessageStore);
+
+        final AtomicBoolean schedulerExecuted = new AtomicBoolean(false);
+        final AtomicBoolean wasSkipped = new AtomicBoolean(false);
+
+        scheduler.scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    if (storeConfig.isTimerEnableCheckMetrics()) {
+                        // ★ Fix logic: skip when timerStopEnqueue=true
+                        if (storeConfig.isTimerStopEnqueue()) {
+                            wasSkipped.set(true);
+                            schedulerExecuted.set(true);
+                            return;
+                        }
+                        String when = storeConfig.getTimerCheckMetricsWhen();
+                        if (!UtilAll.isItTimeToDo(when)) {
+                            return;
+                        }
+                        long curr = System.currentTimeMillis();
+                        if (curr - timerMessageStore.lastTimeOfCheckMetrics > 
70 * 60 * 1000) {
+                            timerMessageStore.lastTimeOfCheckMetrics = curr;
+                            timerMessageStore.checkAndReviseMetrics();
+                            wasSkipped.set(false);
+                            schedulerExecuted.set(true);
+                        }
+                    }
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        }, 1, 1, TimeUnit.SECONDS);
+
+        // Phase 6: Wait for the scheduler to execute
+        await().atMost(10000, TimeUnit.MILLISECONDS).until(new 
Callable<Boolean>() {
+            @Override
+            public Boolean call() {
+                return schedulerExecuted.get();
+            }
+        });
+
+        // Phase 7: Verify
+        assertTrue("Scheduler should skip checkAndReviseMetrics due to 
timerStopEnqueue=true", wasSkipped.get());
+        assertEquals("RocksDB topic metrics should not be overwritten",
+                rocksdbMsgCount, timerMetrics.getTimingCount(rocksdbTopic));
+        assertEquals("File-based topic metrics should remain unchanged",
+                fileMsgCount, timerMetrics.getTimingCount(fileTopic));
+    }
+
+    /**
+     * Scenario 6: Repeated engine switches, verifying metrics consistency.
+     *
+     * Flow:
+     * 1. File-based write -> checkAndReviseMetrics normal -> metrics correct
+     * 2. Switch to RocksDB -> RocksDB write -> scheduler skips -> all metrics 
preserved
+     * 3. Switch back to file-based -> checkAndReviseMetrics resumes -> 
file-based metrics correctly revised
+     * 4. Switch to RocksDB again -> new RocksDB metrics written -> scheduler 
skips -> all metrics preserved
+     */
+    @Test
+    public void testRepeatedSwitch_metricsConsistency() throws Exception {
+        Assume.assumeFalse(MixAll.isWindows());
+
+        final String fileTopic = "FixVerify_Repeat_File_" + 
System.currentTimeMillis();
+        final String rocksdbTopic1 = "FixVerify_Repeat_RDB1_" + 
System.currentTimeMillis();
+        final String rocksdbTopic2 = "FixVerify_Repeat_RDB2_" + 
System.currentTimeMillis();
+        final int fileMsgCount = 3;
+        final int rocksdb1MsgCount = 4;
+        final int rocksdb2MsgCount = 5;
+
+        final TimerMessageStore timerMessageStore = 
createTimerMessageStore(null);
+        timerMessageStore.load();
+        timerMessageStore.start(true);
+
+        TimerMetrics timerMetrics = timerMessageStore.getTimerMetrics();
+
+        // ========== Round 1: Normal write in file-based mode ==========
+        long delayMs = System.currentTimeMillis() / precisionMs * precisionMs 
+ 60000;
+        for (int i = 0; i < fileMsgCount; i++) {
+            MessageExtBrokerInner inner = buildMessage(delayMs, fileTopic, 
false);
+            transformTimerMessage(timerMessageStore, inner, storeConfig);
+            assertEquals(PutMessageStatus.PUT_OK, 
messageStore.putMessage(inner).getPutMessageStatus());
+        }
+
+        await().atMost(5000, TimeUnit.MILLISECONDS).until(new 
Callable<Boolean>() {
+            @Override
+            public Boolean call() {
+                return timerMessageStore.getCommitQueueOffset() >= 
fileMsgCount;
+            }
+        });
+
+        assertEquals(fileMsgCount, timerMetrics.getTimingCount(fileTopic));
+
+        // Execute checkAndReviseMetrics in file-based mode
+        if (!storeConfig.isTimerStopEnqueue()) {
+            timerMessageStore.checkAndReviseMetrics();
+        }
+        assertEquals("Round 1: File-based metrics normal", fileMsgCount, 
timerMetrics.getTimingCount(fileTopic));
+
+        // ========== Round 2: Switch to RocksDB ==========
+        storeConfig.setTimerStopEnqueue(true);
+        storeConfig.setTimerRocksDBEnable(true);
+
+        // RocksDB writes the first batch of new topic
+        for (int i = 0; i < rocksdb1MsgCount; i++) {
+            MessageExt mockMsg = new MessageExt();
+            MessageAccessor.putProperty(mockMsg, 
MessageConst.PROPERTY_REAL_TOPIC, rocksdbTopic1);
+            timerMetrics.addAndGet(mockMsg, 1);
+        }
+
+        // Fixed scheduler skips
+        if (!storeConfig.isTimerStopEnqueue()) {
+            timerMessageStore.checkAndReviseMetrics();
+        }
+
+        assertEquals("Round 2: File-based metrics preserved", fileMsgCount, 
timerMetrics.getTimingCount(fileTopic));
+        assertEquals("Round 2: RocksDB Topic1 metrics preserved", 
rocksdb1MsgCount, timerMetrics.getTimingCount(rocksdbTopic1));
+
+        // ========== Round 3: Switch back to file-based mode ==========
+        storeConfig.setTimerStopEnqueue(false);
+        storeConfig.setTimerRocksDBEnable(false);
+
+        // Execute checkAndReviseMetrics after switching back
+        if (!storeConfig.isTimerStopEnqueue()) {
+            timerMessageStore.checkAndReviseMetrics();
+        }
+
+        // Note: After switching back to file-based mode, 
checkAndReviseMetrics re-counts from timerLog.
+        // RocksDB's Topic1 is not in timerLog and will be overwritten to 0 by 
putAll.
+        // This is the expected behavior after switching back -- file-based 
mode only manages file-based data.
+        assertEquals("Round 3: File-based metrics correct (revised from 
timerLog)",
+                fileMsgCount, timerMetrics.getTimingCount(fileTopic));
+
+        // ========== Round 4: Switch to RocksDB again ==========
+        storeConfig.setTimerStopEnqueue(true);
+        storeConfig.setTimerRocksDBEnable(true);
+
+        // RocksDB writes the second batch of new topic
+        for (int i = 0; i < rocksdb2MsgCount; i++) {
+            MessageExt mockMsg = new MessageExt();
+            MessageAccessor.putProperty(mockMsg, 
MessageConst.PROPERTY_REAL_TOPIC, rocksdbTopic2);
+            timerMetrics.addAndGet(mockMsg, 1);
+        }
+
+        // Fixed scheduler skips
+        if (!storeConfig.isTimerStopEnqueue()) {
+            timerMessageStore.checkAndReviseMetrics();
+        }
+
+        assertEquals("Round 4: File-based metrics preserved", fileMsgCount, 
timerMetrics.getTimingCount(fileTopic));
+        assertEquals("Round 4: RocksDB Topic2 metrics preserved", 
rocksdb2MsgCount, timerMetrics.getTimingCount(rocksdbTopic2));
+    }
+
+    // ======================== Utility Methods ========================
+
+    private static PutMessageResult transformTimerMessage(TimerMessageStore 
timerMessageStore,
+            MessageExtBrokerInner msg, MessageStoreConfig storeConfig) {
+        int delayLevel = msg.getDelayTimeLevel();
+        long deliverMs;
+        try {
+            if (msg.getProperty(MessageConst.PROPERTY_TIMER_DELAY_SEC) != 
null) {
+                deliverMs = System.currentTimeMillis()
+                        + 
Integer.parseInt(msg.getProperty(MessageConst.PROPERTY_TIMER_DELAY_SEC)) * 1000;
+            } else if (msg.getProperty(MessageConst.PROPERTY_TIMER_DELAY_MS) 
!= null) {
+                deliverMs = System.currentTimeMillis()
+                        + 
Integer.parseInt(msg.getProperty(MessageConst.PROPERTY_TIMER_DELAY_MS));
+            } else {
+                deliverMs = 
Long.parseLong(msg.getProperty(MessageConst.PROPERTY_TIMER_DELIVER_MS));
+            }
+        } catch (Exception e) {
+            return new 
PutMessageResult(PutMessageStatus.WHEEL_TIMER_MSG_ILLEGAL, null);
+        }
+        if (deliverMs > System.currentTimeMillis()) {
+            if (delayLevel <= 0 && deliverMs - System.currentTimeMillis() > 
storeConfig.getTimerMaxDelaySec() * 1000L) {
+                return new 
PutMessageResult(PutMessageStatus.WHEEL_TIMER_MSG_ILLEGAL, null);
+            }
+            int timerPrecisionMs = storeConfig.getTimerPrecisionMs();
+            if (deliverMs % timerPrecisionMs == 0) {
+                deliverMs -= timerPrecisionMs;
+            } else {
+                deliverMs = deliverMs / timerPrecisionMs * timerPrecisionMs;
+            }
+            MessageAccessor.putProperty(msg, 
MessageConst.PROPERTY_TIMER_OUT_MS, deliverMs + "");
+            MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, 
msg.getTopic());
+            MessageAccessor.putProperty(msg, 
MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
+            
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
+            msg.setTopic(TimerMessageStore.TIMER_TOPIC);
+            msg.setQueueId(0);
+        }
+        return null;
+    }
+
+    private MessageExtBrokerInner buildMessage(long delayedMs, String topic, 
boolean relative) {
+        MessageExtBrokerInner msg = new MessageExtBrokerInner();
+        msg.setTopic(topic);
+        msg.setQueueId(0);
+        msg.setTags(counter.incrementAndGet() + "");
+        msg.setKeys("timer");
+        if (relative) {
+            MessageAccessor.putProperty(msg, 
MessageConst.PROPERTY_TIMER_DELAY_SEC, delayedMs / 1000 + "");
+        } else {
+            MessageAccessor.putProperty(msg, 
MessageConst.PROPERTY_TIMER_DELIVER_MS, delayedMs + "");
+        }
+        msg.setBody(msgBody);
+        msg.setKeys(String.valueOf(System.currentTimeMillis()));
+        msg.setQueueId(0);
+        msg.setBornTimestamp(System.currentTimeMillis());
+        msg.setBornHost(bornHost);
+        msg.setStoreHost(storeHost);
+        MessageClientIDSetter.setUniqID(msg);
+        TopicFilterType topicFilterType = 
MessageExt.parseTopicFilterType(msg.getSysFlag());
+        long tagsCodeValue =
+                MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, 
msg.getTags());
+        msg.setTagsCode(tagsCodeValue);
+        
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
+        return msg;
+    }
+
+    private class MyMessageArrivingListener implements MessageArrivingListener 
{
+        @Override
+        public void arriving(String topic, int queueId, long logicOffset, long 
tagsCode, long msgStoreTime,
+                byte[] filterBitMap, Map<String, String> properties) {
+        }
+    }
+
+    @After
+    public void clear() {
+        for (TimerMessageStore store : timerStores) {
+            store.shutdown();
+        }
+        for (String baseDir : baseDirs) {
+            StoreTestUtils.deleteFile(baseDir);
+        }
+        if (null != messageStore) {
+            messageStore.shutdown();
+            messageStore.destroy();
+        }
+    }
+}

Reply via email to