This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 9a891f1d49 fix: multiple patches during long running tests for LMQ 
over RocksDB (#8915)
9a891f1d49 is described below

commit 9a891f1d49af0cfe4e384bc28a0bf0eeed02587f
Author: Zhanhui Li <lizhan...@apache.org>
AuthorDate: Mon Dec 2 10:07:54 2024 +0800

    fix: multiple patches during long running tests for LMQ over RocksDB (#8915)
    
    * fix: multiple patches during long running tests for LMQ over RocksDB
    
    Signed-off-by: Li Zhanhui <lizhan...@gmail.com>
    
    * fix: fix a bug in RocksGroupCommitService; remove 
RocksDBConsumeQueueStore#findConsumeQueueMap override
    
    Signed-off-by: Li Zhanhui <lizhan...@gmail.com>
    
    * fix: async fsync on RocksDB WAL flush
    
    Signed-off-by: Li Zhanhui <lizhan...@gmail.com>
    
    * fix: use a dedicated thread to flush and sync RocksDB WAL
    
    Signed-off-by: Li Zhanhui <lizhan...@gmail.com>
    
    * fix: trigger WAL rolling according to estimated WAL file size
    
    Signed-off-by: Li Zhanhui <lizhan...@gmail.com>
    
    * chore: add doc, explaining config RocksDB instance flush/sync strategy
    
    Signed-off-by: Li Zhanhui <lizhan...@gmail.com>
    
    * fix: data-version should be per table
    
    Signed-off-by: Li Zhanhui <lizhan...@gmail.com>
    
    * fix: test case: RocksdbTransferOffsetAndCqTest
    
    Signed-off-by: Li Zhanhui <lizhan...@gmail.com>
    
    ---------
    
    Signed-off-by: Li Zhanhui <lizhan...@gmail.com>
---
 .../apache/rocketmq/broker/BrokerController.java   |   2 +-
 .../rocketmq/broker/config/v2/ConfigHelper.java    |   4 +-
 .../rocketmq/broker/config/v2/ConfigStorage.java   | 156 ++++++++++++++++++++-
 .../broker/config/v2/ConsumerOffsetManagerV2.java  |   8 +-
 .../config/v2/SubscriptionGroupManagerV2.java      |   6 +-
 .../broker/config/v2/TopicConfigManagerV2.java     |   6 +-
 .../config/v2/ConsumerOffsetManagerV2Test.java     |  19 ++-
 .../config/v2/SubscriptionGroupManagerV2Test.java  |  15 +-
 .../broker/config/v2/TopicConfigManagerV2Test.java |  25 +++-
 .../offset/RocksdbTransferOffsetAndCqTest.java     |  16 ++-
 .../common/config/AbstractRocksDBStorage.java      |  34 ++---
 .../rocketmq/common/config/ConfigHelper.java       |  32 +++--
 .../common/config/ConfigRocksDBStorage.java        |   2 +-
 .../java/org/apache/rocketmq/store/CommitLog.java  |  16 ++-
 .../rocketmq/store/config/MessageStoreConfig.java  |  24 ++++
 .../store/queue/RocksDBConsumeQueueStore.java      |  95 +++++++++----
 .../store/queue/RocksGroupCommitService.java       | 103 ++++++++++++++
 .../store/rocksdb/RocksDBOptionsFactory.java       |   7 +-
 18 files changed, 474 insertions(+), 96 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java 
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 99e5b85d2e..e1edd2f512 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -344,7 +344,7 @@ public class BrokerController {
         this.brokerStatsManager = messageStoreConfig.isEnableLmq() ? new 
LmqBrokerStatsManager(this.brokerConfig) : new 
BrokerStatsManager(this.brokerConfig.getBrokerClusterName(), 
this.brokerConfig.isEnableDetailStat());
         this.broadcastOffsetManager = new BroadcastOffsetManager(this);
         if 
(ConfigManagerVersion.V2.getVersion().equals(brokerConfig.getConfigManagerVersion()))
 {
-            this.configStorage = new 
ConfigStorage(messageStoreConfig.getStorePathRootDir());
+            this.configStorage = new ConfigStorage(messageStoreConfig);
             this.topicConfigManager = new TopicConfigManagerV2(this, 
configStorage);
             this.subscriptionGroupManager = new 
SubscriptionGroupManagerV2(this, configStorage);
             this.consumerOffsetManager = new ConsumerOffsetManagerV2(this, 
configStorage);
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConfigHelper.java 
b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConfigHelper.java
index 8183a1f835..29a7c313ba 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConfigHelper.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConfigHelper.java
@@ -64,7 +64,7 @@ public class ConfigHelper {
         return Optional.empty();
     }
 
-    public static void stampDataVersion(WriteBatch writeBatch, DataVersion 
dataVersion, long stateMachineVersion)
+    public static void stampDataVersion(WriteBatch writeBatch, TableId table, 
DataVersion dataVersion, long stateMachineVersion)
         throws RocksDBException {
         // Increase data version
         dataVersion.nextVersion(stateMachineVersion);
@@ -75,7 +75,7 @@ public class ConfigHelper {
         ByteBuf valueBuf = 
AbstractRocksDBStorage.POOLED_ALLOCATOR.buffer(Long.BYTES * 3);
         try {
             keyBuf.writeByte(TablePrefix.TABLE.getValue());
-            keyBuf.writeShort(TableId.CONSUMER_OFFSET.getValue());
+            keyBuf.writeShort(table.getValue());
             keyBuf.writeByte(RecordPrefix.DATA_VERSION.getValue());
             keyBuf.writeBytes(ConfigStorage.DATA_VERSION_KEY_BYTES);
             valueBuf.writeLong(dataVersion.getStateVersion());
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConfigStorage.java 
b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConfigStorage.java
index 6bc62957a8..c4056d142f 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConfigStorage.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConfigStorage.java
@@ -16,17 +16,29 @@
  */
 package org.apache.rocketmq.broker.config.v2;
 
+import com.google.common.base.Stopwatch;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.netty.buffer.PooledByteBufAllocatorMetric;
 import io.netty.util.internal.PlatformDependent;
 import java.io.File;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.rocketmq.common.ServiceThread;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.config.AbstractRocksDBStorage;
 import org.apache.rocketmq.common.config.ConfigHelper;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
 import org.rocksdb.ColumnFamilyDescriptor;
 import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.FlushOptions;
 import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
@@ -43,8 +55,48 @@ public class ConfigStorage extends AbstractRocksDBStorage {
     public static final String DATA_VERSION_KEY = "data_version";
     public static final byte[] DATA_VERSION_KEY_BYTES = 
DATA_VERSION_KEY.getBytes(StandardCharsets.UTF_8);
 
-    public ConfigStorage(String storePath) {
-        super(storePath + File.separator + "config" + File.separator + "rdb");
+    private final ScheduledExecutorService scheduledExecutorService;
+
+    /**
+     * Number of write ops since previous flush.
+     */
+    private final AtomicInteger writeOpsCounter;
+
+    private final AtomicLong estimateWalFileSize = new AtomicLong(0L);
+
+    private final MessageStoreConfig messageStoreConfig;
+
+    private final FlushSyncService flushSyncService;
+
+    public ConfigStorage(MessageStoreConfig messageStoreConfig) {
+        super(messageStoreConfig.getStorePathRootDir() + File.separator + 
"config" + File.separator + "rdb");
+        this.messageStoreConfig = messageStoreConfig;
+        ThreadFactory threadFactory = new ThreadFactoryBuilder()
+            .setDaemon(true)
+            .setNameFormat("config-storage-%d")
+            .build();
+        scheduledExecutorService = new ScheduledThreadPoolExecutor(1, 
threadFactory);
+        writeOpsCounter = new AtomicInteger(0);
+        this.flushSyncService = new FlushSyncService();
+        this.flushSyncService.setDaemon(true);
+    }
+
+    private void statNettyMemory() {
+        PooledByteBufAllocatorMetric metric = 
AbstractRocksDBStorage.POOLED_ALLOCATOR.metric();
+        LOGGER.info("Netty Memory Usage: {}", metric);
+    }
+
+    @Override
+    public synchronized boolean start() {
+        boolean started = super.start();
+        if (started) {
+            scheduledExecutorService.scheduleWithFixedDelay(() -> 
statRocksdb(LOGGER), 1, 10, TimeUnit.SECONDS);
+            
scheduledExecutorService.scheduleWithFixedDelay(this::statNettyMemory, 10, 10, 
TimeUnit.SECONDS);
+            this.flushSyncService.start();
+        } else {
+            LOGGER.error("Failed to start config storage");
+        }
+        return started;
     }
 
     @Override
@@ -58,7 +110,7 @@ public class ConfigStorage extends AbstractRocksDBStorage {
             initOptions();
             List<ColumnFamilyDescriptor> cfDescriptors = new ArrayList<>();
 
-            ColumnFamilyOptions defaultOptions = 
ConfigHelper.createConfigOptions();
+            ColumnFamilyOptions defaultOptions = 
ConfigHelper.createConfigColumnFamilyOptions();
             this.cfOptions.add(defaultOptions);
             cfDescriptors.add(new 
ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, defaultOptions));
 
@@ -66,7 +118,7 @@ public class ConfigStorage extends AbstractRocksDBStorage {
             open(cfDescriptors);
 
             this.defaultCFHandle = cfHandles.get(0);
-        } catch (final Exception e) {
+        } catch (Exception e) {
             AbstractRocksDBStorage.LOGGER.error("postLoad Failed. {}", 
this.dbPath, e);
             return false;
         }
@@ -75,7 +127,8 @@ public class ConfigStorage extends AbstractRocksDBStorage {
 
     @Override
     protected void preShutdown() {
-
+        scheduledExecutorService.shutdown();
+        flushSyncService.shutdown();
     }
 
     protected void initOptions() {
@@ -105,6 +158,12 @@ public class ConfigStorage extends AbstractRocksDBStorage {
 
     public void write(WriteBatch writeBatch) throws RocksDBException {
         db.write(ableWalWriteOptions, writeBatch);
+        accountWriteOps(writeBatch.getDataSize());
+    }
+
+    private void accountWriteOps(long dataSize) {
+        writeOpsCounter.incrementAndGet();
+        estimateWalFileSize.addAndGet(dataSize);
     }
 
     public RocksIterator iterate(ByteBuffer beginKey, ByteBuffer endKey) {
@@ -125,4 +184,91 @@ public class ConfigStorage extends AbstractRocksDBStorage {
             return iterator;
         }
     }
+
+    /**
+     * RocksDB writes contain 3 stages: application memory buffer --> OS Page 
Cache --> Disk.
+     * Given that we are having DBOptions::manual_wal_flush, we need to 
manually call DB::FlushWAL and DB::SyncWAL
+     * Note: DB::FlushWAL(true) will internally call DB::SyncWAL.
+     * <p>
+     * See <a href="https://rocksdb.org/blog/2017/08/25/flushwal.html";>Flush 
And Sync WAL</a>
+     */
+    class FlushSyncService extends ServiceThread {
+
+        private long lastSyncTime = 0;
+
+        private static final long MAX_SYNC_INTERVAL_IN_MILLIS = 100;
+
+        private final Stopwatch stopwatch = Stopwatch.createUnstarted();
+
+        private final FlushOptions flushOptions = new FlushOptions();
+
+        @Override
+        public String getServiceName() {
+            return "FlushSyncService";
+        }
+
+        @Override
+        public void run() {
+            flushOptions.setAllowWriteStall(false);
+            flushOptions.setWaitForFlush(true);
+            log.info("{} service started", this.getServiceName());
+            while (!this.isStopped()) {
+                try {
+                    this.waitForRunning(10);
+                    this.flushAndSyncWAL(false);
+                } catch (Exception e) {
+                    log.warn("{} service has exception. ", 
this.getServiceName(), e);
+                }
+            }
+            try {
+                flushAndSyncWAL(true);
+            } catch (Exception e) {
+                log.warn("{} raised an exception while performing 
flush-and-sync WAL on exit",
+                    this.getServiceName(), e);
+            }
+            flushOptions.close();
+            log.info("{} service end", this.getServiceName());
+        }
+
+        private void flushAndSyncWAL(boolean onExit) throws RocksDBException {
+            int writeOps = writeOpsCounter.get();
+            if (0 == writeOps) {
+                // No write ops to flush
+                return;
+            }
+
+            /*
+             * Normally, when MemTables become full then immutable, RocksDB 
threads will automatically flush them to L0
+             * SST files. The use case here is different: the MemTable may 
never get full and immutable given that the
+             * volume of data involved is relatively small. Further, we are 
constantly modifying the key-value pairs and
+             * generating WAL entries. The WAL file size can grow up to dozens 
of gigabytes without manual triggering of
+             * flush.
+             */
+            if (ConfigStorage.this.estimateWalFileSize.get() >= 
messageStoreConfig.getRocksdbWalFileRollingThreshold()) {
+                ConfigStorage.this.flush(flushOptions);
+                estimateWalFileSize.set(0L);
+            }
+
+            // Flush and Sync WAL if we have committed enough writes
+            if (writeOps >= messageStoreConfig.getRocksdbFlushWalFrequency() 
|| onExit) {
+                stopwatch.reset().start();
+                ConfigStorage.this.db.flushWal(true);
+                long elapsed = stopwatch.stop().elapsed(TimeUnit.MILLISECONDS);
+                writeOpsCounter.getAndAdd(-writeOps);
+                lastSyncTime = System.currentTimeMillis();
+                LOGGER.debug("Flush and Sync WAL of RocksDB[{}] costs {}ms, 
write-ops={}", dbPath, elapsed, writeOps);
+                return;
+            }
+            // Flush and Sync WAL if some writes are out there for a period of 
time
+            long elapsedTime = System.currentTimeMillis() - lastSyncTime;
+            if (elapsedTime > MAX_SYNC_INTERVAL_IN_MILLIS) {
+                stopwatch.reset().start();
+                ConfigStorage.this.db.flushWal(true);
+                long elapsed = stopwatch.stop().elapsed(TimeUnit.MILLISECONDS);
+                LOGGER.debug("Flush and Sync WAL of RocksDB[{}] costs {}ms, 
write-ops={}", dbPath, elapsed, writeOps);
+                writeOpsCounter.getAndAdd(-writeOps);
+                lastSyncTime = System.currentTimeMillis();
+            }
+        }
+    }
 }
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2.java
index 2c5d3677d8..1821c801cb 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2.java
@@ -97,7 +97,7 @@ public class ConsumerOffsetManagerV2 extends 
ConsumerOffsetManager {
             // TODO: we have to make a copy here as WriteBatch lacks 
ByteBuffer API here
             writeBatch.deleteRange(ConfigHelper.readBytes(beginKey), 
ConfigHelper.readBytes(endKey));
             long stateMachineVersion = brokerController.getMessageStore() != 
null ? brokerController.getMessageStore().getStateMachineVersion() : 0;
-            ConfigHelper.stampDataVersion(writeBatch, dataVersion, 
stateMachineVersion);
+            ConfigHelper.stampDataVersion(writeBatch, TableId.CONSUMER_OFFSET, 
dataVersion, stateMachineVersion);
             configStorage.write(writeBatch);
         } catch (RocksDBException e) {
             LOG.error("Failed to removeConsumerOffset, topicAtGroup={}", 
topicAtGroup, e);
@@ -138,7 +138,7 @@ public class ConsumerOffsetManagerV2 extends 
ConsumerOffsetManager {
             writeBatch.deleteRange(ConfigHelper.readBytes(beginKey), 
ConfigHelper.readBytes(endKey));
             MessageStore messageStore = brokerController.getMessageStore();
             long stateMachineVersion = messageStore != null ? 
messageStore.getStateMachineVersion() : 0;
-            ConfigHelper.stampDataVersion(writeBatch, dataVersion, 
stateMachineVersion);
+            ConfigHelper.stampDataVersion(writeBatch, TableId.CONSUMER_OFFSET, 
dataVersion, stateMachineVersion);
             configStorage.write(writeBatch);
         } catch (RocksDBException e) {
             LOG.error("Failed to consumer offsets by group={}", group, e);
@@ -194,7 +194,7 @@ public class ConsumerOffsetManagerV2 extends 
ConsumerOffsetManager {
             writeBatch.put(keyBuf.nioBuffer(), valueBuf.nioBuffer());
             MessageStore messageStore = brokerController.getMessageStore();
             long stateMachineVersion = messageStore != null ? 
messageStore.getStateMachineVersion() : 0;
-            ConfigHelper.stampDataVersion(writeBatch, dataVersion, 
stateMachineVersion);
+            ConfigHelper.stampDataVersion(writeBatch, TableId.CONSUMER_OFFSET, 
dataVersion, stateMachineVersion);
             configStorage.write(writeBatch);
         } catch (RocksDBException e) {
             LOG.error("Failed to commit consumer offset", e);
@@ -394,7 +394,7 @@ public class ConsumerOffsetManagerV2 extends 
ConsumerOffsetManager {
         try (WriteBatch writeBatch = new WriteBatch()) {
             writeBatch.put(keyBuf.nioBuffer(), valueBuf.nioBuffer());
             long stateMachineVersion = brokerController.getMessageStore() != 
null ? brokerController.getMessageStore().getStateMachineVersion() : 0;
-            ConfigHelper.stampDataVersion(writeBatch, dataVersion, 
stateMachineVersion);
+            ConfigHelper.stampDataVersion(writeBatch, TableId.PULL_OFFSET, 
dataVersion, stateMachineVersion);
             configStorage.write(writeBatch);
         } catch (RocksDBException e) {
             LOG.error("Failed to commit pull offset. group={}, topic={}, 
queueId={}, offset={}",
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/config/v2/SubscriptionGroupManagerV2.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/SubscriptionGroupManagerV2.java
index dea8a2d2c1..dd67871f18 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/config/v2/SubscriptionGroupManagerV2.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/SubscriptionGroupManagerV2.java
@@ -137,8 +137,10 @@ public class SubscriptionGroupManagerV2 extends 
SubscriptionGroupManager {
         try (WriteBatch writeBatch = new WriteBatch()) {
             writeBatch.put(keyBuf.nioBuffer(), valueBuf.nioBuffer());
             long stateMachineVersion = brokerController.getMessageStore() != 
null ? brokerController.getMessageStore().getStateMachineVersion() : 0;
-            ConfigHelper.stampDataVersion(writeBatch, dataVersion, 
stateMachineVersion);
+            ConfigHelper.stampDataVersion(writeBatch, 
TableId.SUBSCRIPTION_GROUP, dataVersion, stateMachineVersion);
             configStorage.write(writeBatch);
+            // fdatasync on core metadata change
+            persist();
         } catch (RocksDBException e) {
             log.error("update subscription group config error", e);
         } finally {
@@ -163,7 +165,7 @@ public class SubscriptionGroupManagerV2 extends 
SubscriptionGroupManager {
         try (WriteBatch writeBatch = new WriteBatch()) {
             writeBatch.delete(ConfigHelper.readBytes(keyBuf));
             long stateMachineVersion = 
brokerController.getMessageStore().getStateMachineVersion();
-            ConfigHelper.stampDataVersion(writeBatch, dataVersion, 
stateMachineVersion);
+            ConfigHelper.stampDataVersion(writeBatch, 
TableId.SUBSCRIPTION_GROUP, dataVersion, stateMachineVersion);
             configStorage.write(writeBatch);
         } catch (RocksDBException e) {
             log.error("Failed to remove subscription group config by 
group-name={}", groupName, e);
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/config/v2/TopicConfigManagerV2.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/TopicConfigManagerV2.java
index 4e36b08727..7991d70445 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/config/v2/TopicConfigManagerV2.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/TopicConfigManagerV2.java
@@ -151,8 +151,10 @@ public class TopicConfigManagerV2 extends 
TopicConfigManager {
         try (WriteBatch writeBatch = new WriteBatch()) {
             writeBatch.put(keyBuf.nioBuffer(), valueBuf.nioBuffer());
             long stateMachineVersion = brokerController.getMessageStore() != 
null ? brokerController.getMessageStore().getStateMachineVersion() : 0;
-            ConfigHelper.stampDataVersion(writeBatch, dataVersion, 
stateMachineVersion);
+            ConfigHelper.stampDataVersion(writeBatch, TableId.TOPIC, 
dataVersion, stateMachineVersion);
             configStorage.write(writeBatch);
+            // fdatasync on core metadata change
+            this.persist();
         } catch (RocksDBException e) {
             log.error("Failed to update topic config", e);
         } finally {
@@ -167,7 +169,7 @@ public class TopicConfigManagerV2 extends 
TopicConfigManager {
         try (WriteBatch writeBatch = new WriteBatch()) {
             writeBatch.delete(keyBuf.nioBuffer());
             long stateMachineVersion = brokerController.getMessageStore() != 
null ? brokerController.getMessageStore().getStateMachineVersion() : 0;
-            ConfigHelper.stampDataVersion(writeBatch, dataVersion, 
stateMachineVersion);
+            ConfigHelper.stampDataVersion(writeBatch, TableId.TOPIC, 
dataVersion, stateMachineVersion);
             configStorage.write(writeBatch);
         } catch (RocksDBException e) {
             log.error("Failed to delete topic config by topicName={}", 
topicName, e);
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2Test.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2Test.java
index d7f46855e1..132bd5c1a5 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2Test.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2Test.java
@@ -23,6 +23,7 @@ import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -44,6 +45,8 @@ public class ConsumerOffsetManagerV2Test {
     @Mock
     private BrokerController controller;
 
+    private MessageStoreConfig messageStoreConfig;
+
     @Rule
     public TemporaryFolder tf = new TemporaryFolder();
 
@@ -60,7 +63,9 @@ public class ConsumerOffsetManagerV2Test {
         Mockito.doReturn(brokerConfig).when(controller).getBrokerConfig();
 
         File configStoreDir = tf.newFolder();
-        configStorage = new ConfigStorage(configStoreDir.getAbsolutePath());
+        messageStoreConfig = new MessageStoreConfig();
+        
messageStoreConfig.setStorePathRootDir(configStoreDir.getAbsolutePath());
+        configStorage = new ConfigStorage(messageStoreConfig);
         configStorage.start();
         consumerOffsetManagerV2 = new ConsumerOffsetManagerV2(controller, 
configStorage);
     }
@@ -84,7 +89,9 @@ public class ConsumerOffsetManagerV2Test {
         consumerOffsetManagerV2.getOffsetTable().clear();
         Assert.assertEquals(-1L, consumerOffsetManagerV2.queryOffset(group, 
topic, queueId));
 
+        configStorage = new ConfigStorage(messageStoreConfig);
         configStorage.start();
+        consumerOffsetManagerV2 = new ConsumerOffsetManagerV2(controller, 
configStorage);
         consumerOffsetManagerV2.load();
         Assert.assertEquals(queueOffset, 
consumerOffsetManagerV2.queryOffset(group, topic, queueId));
     }
@@ -106,7 +113,9 @@ public class ConsumerOffsetManagerV2Test {
 
         configStorage.shutdown();
 
+        configStorage = new ConfigStorage(messageStoreConfig);
         configStorage.start();
+        consumerOffsetManagerV2 = new ConsumerOffsetManagerV2(controller, 
configStorage);
         consumerOffsetManagerV2.load();
         Assert.assertEquals(queueOffset, 
consumerOffsetManagerV2.queryOffset(group, topic, queueId));
     }
@@ -129,7 +138,9 @@ public class ConsumerOffsetManagerV2Test {
 
         configStorage.shutdown();
 
+        configStorage = new ConfigStorage(messageStoreConfig);
         configStorage.start();
+        consumerOffsetManagerV2 = new ConsumerOffsetManagerV2(controller, 
configStorage);
         consumerOffsetManagerV2.load();
         Assert.assertEquals(queueOffset, 
consumerOffsetManagerV2.queryPullOffset(group, topic, queueId));
     }
@@ -157,7 +168,10 @@ public class ConsumerOffsetManagerV2Test {
         Assert.assertEquals(queueOffset, 
consumerOffsetManagerV2.queryOffset(group, topic2, queueId));
 
         configStorage.shutdown();
+
+        configStorage = new ConfigStorage(messageStoreConfig);
         configStorage.start();
+        consumerOffsetManagerV2 = new ConsumerOffsetManagerV2(controller, 
configStorage);
         consumerOffsetManagerV2.load();
         Assert.assertEquals(-1L, consumerOffsetManagerV2.queryOffset(group, 
topic, queueId));
         Assert.assertEquals(queueOffset, 
consumerOffsetManagerV2.queryOffset(group, topic2, queueId));
@@ -184,7 +198,10 @@ public class ConsumerOffsetManagerV2Test {
         Assert.assertEquals(-1L, consumerOffsetManagerV2.queryOffset(group, 
topic2, queueId));
 
         configStorage.shutdown();
+
+        configStorage = new ConfigStorage(messageStoreConfig);
         configStorage.start();
+        consumerOffsetManagerV2 = new ConsumerOffsetManagerV2(controller, 
configStorage);
         consumerOffsetManagerV2.load();
         Assert.assertEquals(-1L, consumerOffsetManagerV2.queryOffset(group, 
topic, queueId));
         Assert.assertEquals(-1L, consumerOffsetManagerV2.queryOffset(group, 
topic2, queueId));
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/config/v2/SubscriptionGroupManagerV2Test.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/config/v2/SubscriptionGroupManagerV2Test.java
index 6d436a7c4d..4ff8a81e60 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/config/v2/SubscriptionGroupManagerV2Test.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/config/v2/SubscriptionGroupManagerV2Test.java
@@ -25,6 +25,7 @@ import 
org.apache.rocketmq.remoting.protocol.subscription.GroupRetryPolicy;
 import org.apache.rocketmq.remoting.protocol.subscription.GroupRetryPolicyType;
 import 
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
 import org.apache.rocketmq.store.MessageStore;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -38,6 +39,9 @@ import org.mockito.junit.MockitoJUnitRunner;
 
 @RunWith(MockitoJUnitRunner.class)
 public class SubscriptionGroupManagerV2Test {
+
+    private MessageStoreConfig messageStoreConfig;
+
     private ConfigStorage configStorage;
 
     private SubscriptionGroupManagerV2 subscriptionGroupManagerV2;
@@ -68,7 +72,9 @@ public class SubscriptionGroupManagerV2Test {
         Mockito.doReturn(1L).when(messageStore).getStateMachineVersion();
 
         File configStoreDir = tf.newFolder();
-        configStorage = new ConfigStorage(configStoreDir.getAbsolutePath());
+        messageStoreConfig = new MessageStoreConfig();
+        
messageStoreConfig.setStorePathRootDir(configStoreDir.getAbsolutePath());
+        configStorage = new ConfigStorage(messageStoreConfig);
         configStorage.start();
         subscriptionGroupManagerV2 = new 
SubscriptionGroupManagerV2(controller, configStorage);
     }
@@ -98,7 +104,10 @@ public class SubscriptionGroupManagerV2Test {
 
         subscriptionGroupManagerV2.getSubscriptionGroupTable().clear();
         configStorage.shutdown();
+
+        configStorage = new ConfigStorage(messageStoreConfig);
         configStorage.start();
+        subscriptionGroupManagerV2 = new 
SubscriptionGroupManagerV2(controller, configStorage);
         subscriptionGroupManagerV2.load();
         found = 
subscriptionGroupManagerV2.findSubscriptionGroupConfig(subscriptionGroupConfig.getGroupName());
         Assert.assertEquals(subscriptionGroupConfig, found);
@@ -132,7 +141,11 @@ public class SubscriptionGroupManagerV2Test {
         Assert.assertNull(found);
 
         configStorage.shutdown();
+
+        configStorage = new ConfigStorage(messageStoreConfig);
         configStorage.start();
+
+        subscriptionGroupManagerV2 = new 
SubscriptionGroupManagerV2(controller, configStorage);
         subscriptionGroupManagerV2.load();
         found = 
subscriptionGroupManagerV2.findSubscriptionGroupConfig(subscriptionGroupConfig.getGroupName());
         Assert.assertNull(found);
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/config/v2/TopicConfigManagerV2Test.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/config/v2/TopicConfigManagerV2Test.java
index 92c936b110..731a1f538f 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/config/v2/TopicConfigManagerV2Test.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/config/v2/TopicConfigManagerV2Test.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.store.MessageStore;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
 import org.junit.After;
 import org.junit.Assert;
@@ -35,17 +36,19 @@ import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.junit.MockitoJUnitRunner;
 
-
 @RunWith(value = MockitoJUnitRunner.class)
 public class TopicConfigManagerV2Test {
 
-    private ConfigStorage configStorage;
+    private MessageStoreConfig messageStoreConfig;
 
-    private TopicConfigManagerV2 topicConfigManagerV2;
+    private ConfigStorage configStorage;
 
     @Mock
     private BrokerController controller;
 
+    @Mock
+    private MessageStore messageStore;
+
     @Rule
     public TemporaryFolder tf = new TemporaryFolder();
 
@@ -61,17 +64,22 @@ public class TopicConfigManagerV2Test {
         BrokerConfig brokerConfig = new BrokerConfig();
         Mockito.doReturn(brokerConfig).when(controller).getBrokerConfig();
 
-        MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
+        messageStoreConfig = new MessageStoreConfig();
         
Mockito.doReturn(messageStoreConfig).when(controller).getMessageStoreConfig();
+        Mockito.doReturn(messageStore).when(controller).getMessageStore();
 
         File configStoreDir = tf.newFolder();
-        configStorage = new ConfigStorage(configStoreDir.getAbsolutePath());
+        
messageStoreConfig.setStorePathRootDir(configStoreDir.getAbsolutePath());
+
+        configStorage = new ConfigStorage(messageStoreConfig);
         configStorage.start();
-        topicConfigManagerV2 = new TopicConfigManagerV2(controller, 
configStorage);
     }
 
     @Test
     public void testUpdateTopicConfig() {
+        TopicConfigManagerV2 topicConfigManagerV2 = new 
TopicConfigManagerV2(controller, configStorage);
+        topicConfigManagerV2.load();
+
         TopicConfig topicConfig = new TopicConfig();
         String topicName = "T1";
         topicConfig.setTopicName(topicName);
@@ -86,7 +94,9 @@ public class TopicConfigManagerV2Test {
 
         topicConfigManagerV2.getTopicConfigTable().clear();
 
+        configStorage = new ConfigStorage(messageStoreConfig);
         Assert.assertTrue(configStorage.start());
+        topicConfigManagerV2 = new TopicConfigManagerV2(controller, 
configStorage);
         Assert.assertTrue(topicConfigManagerV2.load());
 
         TopicConfig loaded = topicConfigManagerV2.selectTopicConfig(topicName);
@@ -111,12 +121,15 @@ public class TopicConfigManagerV2Test {
         topicConfig.setWriteQueueNums(4);
         topicConfig.setOrder(true);
         topicConfig.setTopicSysFlag(4);
+        TopicConfigManagerV2 topicConfigManagerV2 = new 
TopicConfigManagerV2(controller, configStorage);
         topicConfigManagerV2.updateTopicConfig(topicConfig);
         topicConfigManagerV2.removeTopicConfig(topicName);
         Assert.assertFalse(topicConfigManagerV2.containsTopic(topicName));
         Assert.assertTrue(configStorage.shutdown());
 
+        configStorage = new ConfigStorage(messageStoreConfig);
         Assert.assertTrue(configStorage.start());
+        topicConfigManagerV2 = new TopicConfigManagerV2(controller, 
configStorage);
         Assert.assertTrue(topicConfigManagerV2.load());
         Assert.assertFalse(topicConfigManagerV2.containsTopic(topicName));
     }
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksdbTransferOffsetAndCqTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksdbTransferOffsetAndCqTest.java
index 4b320eb53f..6a805b0434 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksdbTransferOffsetAndCqTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksdbTransferOffsetAndCqTest.java
@@ -23,11 +23,11 @@ import java.util.HashMap;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
 import org.apache.commons.collections.MapUtils;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.config.v1.RocksDBConsumerOffsetManager;
 import org.apache.rocketmq.common.BrokerConfig;
-import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.Pair;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.store.DefaultMessageStore;
@@ -38,6 +38,7 @@ import org.apache.rocketmq.store.queue.ConsumeQueueInterface;
 import org.apache.rocketmq.store.queue.ConsumeQueueStoreInterface;
 import org.apache.rocketmq.store.queue.CqUnit;
 import org.apache.rocketmq.store.stats.BrokerStatsManager;
+import org.awaitility.Awaitility;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -135,6 +136,7 @@ public class RocksdbTransferOffsetAndCqTest {
         }
         RocksDBMessageStore kvStore = 
defaultMessageStore.getRocksDBMessageStore();
         ConsumeQueueStoreInterface store = kvStore.getConsumeQueueStore();
+        store.start();
         ConsumeQueueInterface rocksdbCq = 
defaultMessageStore.getRocksDBMessageStore().findConsumeQueue(topic, queueId);
         ConsumeQueueInterface fileCq = 
defaultMessageStore.findConsumeQueue(topic, queueId);
         for (int i = 0; i < 200; i++) {
@@ -142,13 +144,21 @@ public class RocksdbTransferOffsetAndCqTest {
             fileCq.putMessagePositionInfoWrapper(request);
             store.putMessagePositionInfoWrapper(request);
         }
+        Awaitility.await()
+            .pollInterval(100, TimeUnit.MILLISECONDS)
+            .atMost(3, TimeUnit.SECONDS)
+            .until(() -> rocksdbCq.getMaxOffsetInQueue() == 200);
         Pair<CqUnit, Long> unit = rocksdbCq.getCqUnitAndStoreTime(100);
         Pair<CqUnit, Long> unit1 = fileCq.getCqUnitAndStoreTime(100);
-        Assert.assertTrue(unit.getObject1().getPos() == 
unit1.getObject1().getPos());
+        Assert.assertEquals(unit.getObject1().getPos(), 
unit1.getObject1().getPos());
     }
 
+    /**
+     * No need to skip macOS platform.
+     * @return true if some platform is NOT a good fit for this test case.
+     */
     private boolean notToBeExecuted() {
-        return MixAll.isMac();
+        return false;
     }
 
 }
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
 
b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
index 28ed4e924c..48ba4b8086 100644
--- 
a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
+++ 
b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
@@ -121,14 +121,16 @@ public abstract class AbstractRocksDBStorage {
         this.writeOptions = new WriteOptions();
         this.writeOptions.setSync(false);
         this.writeOptions.setDisableWAL(true);
-        this.writeOptions.setNoSlowdown(true);
+        // https://github.com/facebook/rocksdb/wiki/Write-Stalls
+        this.writeOptions.setNoSlowdown(false);
     }
 
     protected void initAbleWalWriteOptions() {
         this.ableWalWriteOptions = new WriteOptions();
         this.ableWalWriteOptions.setSync(false);
         this.ableWalWriteOptions.setDisableWAL(false);
-        this.ableWalWriteOptions.setNoSlowdown(true);
+        // https://github.com/facebook/rocksdb/wiki/Write-Stalls
+        this.ableWalWriteOptions.setNoSlowdown(false);
     }
 
     protected void initReadOptions() {
@@ -363,7 +365,7 @@ public abstract class AbstractRocksDBStorage {
         }
         if (postLoad()) {
             this.loaded = true;
-            LOGGER.info("start OK. {}", this.dbPath);
+            LOGGER.info("RocksDB[{}] starts OK", this.dbPath);
             this.closed = false;
             return true;
         } else {
@@ -560,7 +562,15 @@ public abstract class AbstractRocksDBStorage {
 
     public void statRocksdb(Logger logger) {
         try {
+            // Log Memory Usage
+            String blockCacheMemUsage = 
this.db.getProperty("rocksdb.block-cache-usage");
+            String indexesAndFilterBlockMemUsage = 
this.db.getProperty("rocksdb.estimate-table-readers-mem");
+            String memTableMemUsage = 
this.db.getProperty("rocksdb.cur-size-all-mem-tables");
+            String blocksPinnedByIteratorMemUsage = 
this.db.getProperty("rocksdb.block-cache-pinned-usage");
+            logger.info("RocksDB Memory Usage: BlockCache: {}, 
IndexesAndFilterBlock: {}, MemTable: {}, BlocksPinnedByIterator: {}",
+                blockCacheMemUsage, indexesAndFilterBlockMemUsage, 
memTableMemUsage, blocksPinnedByIteratorMemUsage);
 
+            // Log file metadata by level
             List<LiveFileMetaData> liveFileMetaDataList = 
this.getCompactionStatus();
             if (liveFileMetaDataList == null || 
liveFileMetaDataList.isEmpty()) {
                 return;
@@ -570,21 +580,13 @@ public abstract class AbstractRocksDBStorage {
                 StringBuilder sb = map.computeIfAbsent(metaData.level(), k -> 
new StringBuilder(256));
                 sb.append(new String(metaData.columnFamilyName(), 
StandardCharsets.UTF_8)).append(SPACE).
                     append(metaData.fileName()).append(SPACE).
-                    append("s: ").append(metaData.size()).append(SPACE).
-                    append("a: ").append(metaData.numEntries()).append(SPACE).
-                    append("r: 
").append(metaData.numReadsSampled()).append(SPACE).
-                    append("d: 
").append(metaData.numDeletions()).append(SPACE).
-                    append(metaData.beingCompacted()).append("\n");
+                    append("file-size: 
").append(metaData.size()).append(SPACE).
+                    append("number-of-entries: 
").append(metaData.numEntries()).append(SPACE).
+                    append("file-read-times: 
").append(metaData.numReadsSampled()).append(SPACE).
+                    append("deletions: 
").append(metaData.numDeletions()).append(SPACE).
+                    append("being-compacted: 
").append(metaData.beingCompacted()).append("\n");
             }
-
             map.forEach((key, value) -> logger.info("level: {}\n{}", key, 
value.toString()));
-
-            String blockCacheMemUsage = 
this.db.getProperty("rocksdb.block-cache-usage");
-            String indexesAndFilterBlockMemUsage = 
this.db.getProperty("rocksdb.estimate-table-readers-mem");
-            String memTableMemUsage = 
this.db.getProperty("rocksdb.cur-size-all-mem-tables");
-            String blocksPinnedByIteratorMemUsage = 
this.db.getProperty("rocksdb.block-cache-pinned-usage");
-            logger.info("MemUsage. blockCache: {}, indexesAndFilterBlock: {}, 
MemTable: {}, blocksPinnedByIterator: {}",
-                blockCacheMemUsage, indexesAndFilterBlockMemUsage, 
memTableMemUsage, blocksPinnedByIteratorMemUsage);
         } catch (Exception ignored) {
         }
     }
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/config/ConfigHelper.java 
b/common/src/main/java/org/apache/rocketmq/common/config/ConfigHelper.java
index a4ba35bd5a..e3f6f22002 100644
--- a/common/src/main/java/org/apache/rocketmq/common/config/ConfigHelper.java
+++ b/common/src/main/java/org/apache/rocketmq/common/config/ConfigHelper.java
@@ -38,7 +38,7 @@ import org.rocksdb.WALRecoveryMode;
 import org.rocksdb.util.SizeUnit;
 
 public class ConfigHelper {
-    public static ColumnFamilyOptions createConfigOptions() {
+    public static ColumnFamilyOptions createConfigColumnFamilyOptions() {
         BlockBasedTableConfig blockBasedTableConfig = new 
BlockBasedTableConfig().
             setFormatVersion(5).
             setIndexType(IndexType.kBinarySearch).
@@ -46,7 +46,7 @@ public class ConfigHelper {
             setBlockSize(32 * SizeUnit.KB).
             setFilterPolicy(new BloomFilter(16, false)).
             // Indicating if we'd put index/filter blocks to the block cache.
-                setCacheIndexAndFilterBlocks(false).
+            setCacheIndexAndFilterBlocks(true).
             setCacheIndexAndFilterBlocksWithHighPriority(true).
             setPinL0FilterAndIndexBlocksInCache(false).
             setPinTopLevelIndexAndFilter(true).
@@ -54,9 +54,8 @@ public class ConfigHelper {
             setWholeKeyFiltering(true);
 
         ColumnFamilyOptions options = new ColumnFamilyOptions();
-        return options.setMaxWriteBufferNumber(2).
-            // MemTable size, MemTable(cache) -> immutable MemTable(cache) -> 
SST(disk)
-                setWriteBufferSize(8 * SizeUnit.MB).
+        return options.setMaxWriteBufferNumber(4).
+            setWriteBufferSize(64 * SizeUnit.MB).
             setMinWriteBufferNumberToMerge(1).
             setTableFormatConfig(blockBasedTableConfig).
             setMemTableConfig(new SkipListMemTableConfig()).
@@ -67,17 +66,17 @@ public class ConfigHelper {
             setLevel0SlowdownWritesTrigger(8).
             setLevel0StopWritesTrigger(12).
             // The target file size for compaction.
-                setTargetFileSizeBase(64 * SizeUnit.MB).
+            setTargetFileSizeBase(64 * SizeUnit.MB).
             setTargetFileSizeMultiplier(2).
             // The upper-bound of the total size of L1 files in bytes
-                setMaxBytesForLevelBase(256 * SizeUnit.MB).
+            setMaxBytesForLevelBase(256 * SizeUnit.MB).
             setMaxBytesForLevelMultiplier(2).
             setMergeOperator(new StringAppendOperator()).
             setInplaceUpdateSupport(true);
     }
 
     public static DBOptions createConfigDBOptions() {
-        //Turn based on 
https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide
+        // Tune based on 
https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide
         // and 
http://gitlab.alibaba-inc.com/aloha/aloha/blob/branch_2_5_0/jstorm-core/src/main/java/com/alibaba/jstorm/cache/rocksdb/RocksDbOptionsFactory.java
         DBOptions options = new DBOptions();
         Statistics statistics = new Statistics();
@@ -86,10 +85,20 @@ public class ConfigHelper {
             setDbLogDir(getDBLogDir()).
             setInfoLogLevel(InfoLogLevel.INFO_LEVEL).
             setWalRecoveryMode(WALRecoveryMode.SkipAnyCorruptedRecords).
+            /*
+             * We use manual flush to achieve desired balance between 
reliability and performance:
+             * for metadata that matters, including {topic, 
subscription}-config changes, each write incurs a
+             * flush-and-sync to ensure reliability; for {commit, pull}-offset 
advancements, group-flush are offered for
+             * every N(configurable, 1024 by default) writes or aging of 
writes, similar to OS page-cache flush
+             * mechanism.
+             */
             setManualWalFlush(true).
-            setMaxTotalWalSize(500 * SizeUnit.MB).
-            setWalSizeLimitMB(0).
-            setWalTtlSeconds(0).
+            // This option takes effect only when we have multiple column 
families
+            // https://github.com/facebook/rocksdb/issues/4180
+            // setMaxTotalWalSize(1024 * SizeUnit.MB).
+            setDbWriteBufferSize(128 * SizeUnit.MB).
+            setBytesPerSync(SizeUnit.MB).
+            setWalBytesPerSync(SizeUnit.MB).
             setCreateIfMissing(true).
             setCreateMissingColumnFamilies(true).
             setMaxOpenFiles(-1).
@@ -99,7 +108,6 @@ public class ConfigHelper {
             setAllowConcurrentMemtableWrite(false).
             setStatistics(statistics).
             setStatsDumpPeriodSec(600).
-            setAtomicFlush(true).
             setMaxBackgroundJobs(32).
             setMaxSubcompactions(4).
             setParanoidChecks(true).
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/config/ConfigRocksDBStorage.java
 
b/common/src/main/java/org/apache/rocketmq/common/config/ConfigRocksDBStorage.java
index 3b924a6a0d..5fd9bab2d7 100644
--- 
a/common/src/main/java/org/apache/rocketmq/common/config/ConfigRocksDBStorage.java
+++ 
b/common/src/main/java/org/apache/rocketmq/common/config/ConfigRocksDBStorage.java
@@ -70,7 +70,7 @@ public class ConfigRocksDBStorage extends 
AbstractRocksDBStorage {
 
             final List<ColumnFamilyDescriptor> cfDescriptors = new 
ArrayList<>();
 
-            ColumnFamilyOptions defaultOptions = 
ConfigHelper.createConfigOptions();
+            ColumnFamilyOptions defaultOptions = 
ConfigHelper.createConfigColumnFamilyOptions();
             this.cfOptions.add(defaultOptions);
             cfDescriptors.add(new 
ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, defaultOptions));
             cfDescriptors.add(new 
ColumnFamilyDescriptor(KV_DATA_VERSION_COLUMN_FAMILY_NAME, defaultOptions));
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java 
b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index 7cf9746551..d30691908b 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -326,22 +326,26 @@ public class CommitLog implements Swappable {
         boolean checkDupInfo = 
this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable();
         final List<MappedFile> mappedFiles = 
this.mappedFileQueue.getMappedFiles();
         if (!mappedFiles.isEmpty()) {
-            // Began to recover from the last third file
-            int index = mappedFiles.size() - 3;
-            if (index < 0) {
-                index = 0;
+            int index = mappedFiles.size() - 1;
+            while (index > 0) {
+                MappedFile mappedFile = mappedFiles.get(index);
+                if (mappedFile.getFileFromOffset() <= 
maxPhyOffsetOfConsumeQueue) {
+                    // It's safe to recover from this mapped file
+                    break;
+                }
+                index--;
             }
+            // TODO: Discuss if we need to load more commit-log mapped files 
into memory.
 
             MappedFile mappedFile = mappedFiles.get(index);
             ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
             long processOffset = mappedFile.getFileFromOffset();
             long mappedFileOffset = 0;
             long lastValidMsgPhyOffset = this.getConfirmOffset();
-            // normal recover doesn't require dispatching
-            boolean doDispatch = false;
             while (true) {
                 DispatchRequest dispatchRequest = 
this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover, checkDupInfo);
                 int size = dispatchRequest.getMsgSize();
+                boolean doDispatch = dispatchRequest.getCommitLogOffset() > 
maxPhyOffsetOfConsumeQueue;
                 // Normal data
                 if (dispatchRequest.isSuccess() && size > 0) {
                     lastValidMsgPhyOffset = processOffset + mappedFileOffset;
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java 
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index fe090e3fa2..6dfdc0b1c8 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -23,6 +23,7 @@ import org.apache.rocketmq.store.ConsumeQueue;
 import org.apache.rocketmq.store.StoreType;
 import org.apache.rocketmq.store.queue.BatchConsumeQueue;
 import org.rocksdb.CompressionType;
+import org.rocksdb.util.SizeUnit;
 
 public class MessageStoreConfig {
 
@@ -444,6 +445,13 @@ public class MessageStoreConfig {
 
     private String rocksdbCompressionType = 
CompressionType.LZ4_COMPRESSION.getLibraryName();
 
+    /**
+     * Flush RocksDB WAL frequency, aka, flush WAL every N write ops.
+     */
+    private int rocksdbFlushWalFrequency = 1024;
+
+    private long rocksdbWalFileRollingThreshold = SizeUnit.GB;
+
     public String getRocksdbCompressionType() {
         return rocksdbCompressionType;
     }
@@ -1902,6 +1910,22 @@ public class MessageStoreConfig {
         this.bottomMostCompressionTypeForConsumeQueueStore = 
bottomMostCompressionTypeForConsumeQueueStore;
     }
 
+    public int getRocksdbFlushWalFrequency() {
+        return rocksdbFlushWalFrequency;
+    }
+
+    public void setRocksdbFlushWalFrequency(int rocksdbFlushWalFrequency) {
+        this.rocksdbFlushWalFrequency = rocksdbFlushWalFrequency;
+    }
+
+    public long getRocksdbWalFileRollingThreshold() {
+        return rocksdbWalFileRollingThreshold;
+    }
+
+    public void setRocksdbWalFileRollingThreshold(long 
rocksdbWalFileRollingThreshold) {
+        this.rocksdbWalFileRollingThreshold = rocksdbWalFileRollingThreshold;
+    }
+
     public int getSpinLockCollisionRetreatOptimalDegree() {
         return spinLockCollisionRetreatOptimalDegree;
     }
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java
 
b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java
index 0242ec2309..7e3aa70d02 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java
@@ -30,12 +30,14 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
+import java.util.concurrent.atomic.AtomicReference;
 import javax.annotation.Nonnull;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.common.BoundaryType;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.Pair;
+import org.apache.rocketmq.common.ServiceState;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.message.MessageConst;
@@ -84,6 +86,10 @@ public class RocksDBConsumeQueueStore extends 
AbstractConsumeQueueStore {
 
     private final OffsetInitializer offsetInitializer;
 
+    private final RocksGroupCommitService groupCommitService;
+
+    private final AtomicReference<ServiceState> serviceState = new 
AtomicReference<>(ServiceState.CREATE_JUST);
+
     public RocksDBConsumeQueueStore(DefaultMessageStore messageStore) {
         super(messageStore);
 
@@ -93,6 +99,7 @@ public class RocksDBConsumeQueueStore extends 
AbstractConsumeQueueStore {
         this.rocksDBConsumeQueueOffsetTable = new 
RocksDBConsumeQueueOffsetTable(rocksDBConsumeQueueTable, rocksDBStorage, 
messageStore);
 
         this.offsetInitializer = new OffsetInitializerRocksDBImpl(this);
+        this.groupCommitService = new RocksGroupCommitService(this);
         this.cqBBPairList = new ArrayList<>(16);
         this.offsetBBPairList = new ArrayList<>(DEFAULT_BYTE_BUFFER_CAPACITY);
         for (int i = 0; i < DEFAULT_BYTE_BUFFER_CAPACITY; i++) {
@@ -123,14 +130,17 @@ public class RocksDBConsumeQueueStore extends 
AbstractConsumeQueueStore {
 
     @Override
     public void start() {
-        log.info("RocksDB ConsumeQueueStore start!");
-        this.scheduledExecutorService.scheduleAtFixedRate(() -> {
-            this.rocksDBStorage.statRocksdb(ROCKSDB_LOG);
-        }, 10, this.messageStoreConfig.getStatRocksDBCQIntervalSec(), 
TimeUnit.SECONDS);
-
-        this.scheduledExecutorService.scheduleWithFixedDelay(() -> {
-            cleanDirty(messageStore.getTopicConfigs().keySet());
-        }, 10, this.messageStoreConfig.getCleanRocksDBDirtyCQIntervalMin(), 
TimeUnit.MINUTES);
+        if (serviceState.compareAndSet(ServiceState.CREATE_JUST, 
ServiceState.RUNNING)) {
+            log.info("RocksDB ConsumeQueueStore start!");
+            this.groupCommitService.start();
+            this.scheduledExecutorService.scheduleAtFixedRate(() -> {
+                this.rocksDBStorage.statRocksdb(ROCKSDB_LOG);
+            }, 10, this.messageStoreConfig.getStatRocksDBCQIntervalSec(), 
TimeUnit.SECONDS);
+
+            this.scheduledExecutorService.scheduleWithFixedDelay(() -> {
+                cleanDirty(messageStore.getTopicConfigs().keySet());
+            }, 10, 
this.messageStoreConfig.getCleanRocksDBDirtyCQIntervalMin(), TimeUnit.MINUTES);
+        }
     }
 
     private void cleanDirty(final Set<String> existTopicSet) {
@@ -165,18 +175,23 @@ public class RocksDBConsumeQueueStore extends 
AbstractConsumeQueueStore {
 
     @Override
     public void recover() {
-        // ignored
+        start();
     }
 
     @Override
     public boolean recoverConcurrently() {
+        start();
         return true;
     }
 
     @Override
     public boolean shutdown() {
-        this.scheduledExecutorService.shutdown();
-        return shutdownInner();
+        if (serviceState.compareAndSet(ServiceState.RUNNING, 
ServiceState.SHUTDOWN_ALREADY)) {
+            this.groupCommitService.shutdown();
+            this.scheduledExecutorService.shutdown();
+            return shutdownInner();
+        }
+        return true;
     }
 
     private boolean shutdownInner() {
@@ -188,23 +203,25 @@ public class RocksDBConsumeQueueStore extends 
AbstractConsumeQueueStore {
         if (null == request) {
             return;
         }
-        // We are taking advantage of Atomic Flush, this operation is purely 
memory-based.
-        // batch and cache in Java heap does not make sense, instead, we 
should put the metadata into RocksDB immediately
-        // to optimized overall end-to-end latency.
-        putMessagePosition(request);
+
+        try {
+            groupCommitService.putRequest(request);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        }
     }
 
-    public void putMessagePosition(DispatchRequest request) throws 
RocksDBException {
+    public void putMessagePosition(List<DispatchRequest> requests) throws 
RocksDBException {
         final int maxRetries = 30;
         for (int i = 0; i < maxRetries; i++) {
-            if (putMessagePosition0(request)) {
+            if (putMessagePosition0(requests)) {
                 if (this.isCQError) {
                     
this.messageStore.getRunningFlags().clearLogicsQueueError();
                     this.isCQError = false;
                 }
                 return;
             } else {
-                ERROR_LOG.warn("{} put cq Failed. retryTime: {}", i);
+                ERROR_LOG.warn("Put cq Failed. retryTime: {}", i);
                 try {
                     Thread.sleep(100);
                 } catch (InterruptedException ignored) {
@@ -219,34 +236,43 @@ public class RocksDBConsumeQueueStore extends 
AbstractConsumeQueueStore {
         throw new RocksDBException("put CQ Failed");
     }
 
-    private boolean putMessagePosition0(DispatchRequest request) {
+    private boolean putMessagePosition0(List<DispatchRequest> requests) {
         if (!this.rocksDBStorage.hold()) {
             return false;
         }
 
         try (WriteBatch writeBatch = new WriteBatch()) {
+            final int size = requests.size();
+            if (size == 0) {
+                return true;
+            }
             long maxPhyOffset = 0;
-            DispatchEntry entry = DispatchEntry.from(request);
-            dispatch(entry, writeBatch);
-            dispatchLMQ(request, writeBatch);
-
-            final int msgSize = request.getMsgSize();
-            final long phyOffset = request.getCommitLogOffset();
-            if (phyOffset + msgSize >= maxPhyOffset) {
-                maxPhyOffset = phyOffset + msgSize;
+            for (int i = size - 1; i >= 0; i--) {
+                final DispatchRequest request = requests.get(i);
+                DispatchEntry entry = DispatchEntry.from(request);
+                dispatch(entry, writeBatch);
+                dispatchLMQ(request, writeBatch);
+
+                final int msgSize = request.getMsgSize();
+                final long phyOffset = request.getCommitLogOffset();
+                if (phyOffset + msgSize >= maxPhyOffset) {
+                    maxPhyOffset = phyOffset + msgSize;
+                }
             }
 
             
this.rocksDBConsumeQueueOffsetTable.putMaxPhyAndCqOffset(tempTopicQueueMaxOffsetMap,
 writeBatch, maxPhyOffset);
 
             this.rocksDBStorage.batchPut(writeBatch);
+
             
this.rocksDBConsumeQueueOffsetTable.putHeapMaxCqOffset(tempTopicQueueMaxOffsetMap);
-            long storeTimeStamp = request.getStoreTimestamp();
+
+            long storeTimeStamp = requests.get(size - 1).getStoreTimestamp();
             if (this.messageStore.getMessageStoreConfig().getBrokerRole() == 
BrokerRole.SLAVE
                 || 
this.messageStore.getMessageStoreConfig().isEnableDLegerCommitLog()) {
                 
this.messageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimeStamp);
             }
             
this.messageStore.getStoreCheckpoint().setLogicsMsgTimestamp(storeTimeStamp);
-            notifyMessageArrival(request);
+            notifyMessageArriveAndClear(requests);
             return true;
         } catch (Exception e) {
             ERROR_LOG.error("putMessagePosition0 failed.", e);
@@ -311,9 +337,12 @@ public class RocksDBConsumeQueueStore extends 
AbstractConsumeQueueStore {
         }
     }
 
-    private void notifyMessageArrival(DispatchRequest request) {
+    private void notifyMessageArriveAndClear(List<DispatchRequest> requests) {
         try {
-            this.messageStore.notifyMessageArriveIfNecessary(request);
+            for (DispatchRequest dp : requests) {
+                this.messageStore.notifyMessageArriveIfNecessary(dp);
+            }
+            requests.clear();
         } catch (Exception e) {
             ERROR_LOG.error("notifyMessageArriveAndClear Failed.", e);
         }
@@ -538,4 +567,8 @@ public class RocksDBConsumeQueueStore extends 
AbstractConsumeQueueStore {
         }
         return super.getMaxOffset(topic, queueId);
     }
+
+    public boolean isStopped() {
+        return ServiceState.SHUTDOWN_ALREADY == serviceState.get();
+    }
 }
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/queue/RocksGroupCommitService.java
 
b/store/src/main/java/org/apache/rocketmq/store/queue/RocksGroupCommitService.java
new file mode 100644
index 0000000000..e2f2c9ee2c
--- /dev/null
+++ 
b/store/src/main/java/org/apache/rocketmq/store/queue/RocksGroupCommitService.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store.queue;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.common.ServiceThread;
+import org.apache.rocketmq.store.DispatchRequest;
+import org.rocksdb.RocksDBException;
+
+public class RocksGroupCommitService extends ServiceThread {
+
+    private static final int MAX_BUFFER_SIZE = 100_000;
+
+    private static final int PREFERRED_DISPATCH_REQUEST_COUNT = 256;
+
+    private final LinkedBlockingQueue<DispatchRequest> buffer;
+
+    private final RocksDBConsumeQueueStore store;
+
+    private final List<DispatchRequest> requests = new 
ArrayList<>(PREFERRED_DISPATCH_REQUEST_COUNT);
+
+    public RocksGroupCommitService(RocksDBConsumeQueueStore store) {
+        this.store = store;
+        this.buffer = new LinkedBlockingQueue<>(MAX_BUFFER_SIZE);
+    }
+
+    @Override
+    public String getServiceName() {
+        return "RocksGroupCommit";
+    }
+
+    @Override
+    public void run() {
+        log.info("{} service started", this.getServiceName());
+        while (!this.isStopped()) {
+            try {
+                this.waitForRunning(10);
+                this.doCommit();
+            } catch (Exception e) {
+                log.warn("{} service has exception. ", this.getServiceName(), 
e);
+            }
+        }
+        log.info("{} service end", this.getServiceName());
+    }
+
+    public void putRequest(final DispatchRequest request) throws 
InterruptedException {
+        while (!buffer.offer(request, 3, TimeUnit.SECONDS)) {
+            log.warn("RocksGroupCommitService#buffer is full, 3s elapsed 
before space becomes available");
+        }
+        this.wakeup();
+    }
+
+    private void doCommit() {
+        while (!buffer.isEmpty()) {
+            while (true) {
+                DispatchRequest dispatchRequest = buffer.poll();
+                if (null != dispatchRequest) {
+                    requests.add(dispatchRequest);
+                }
+
+                if (requests.isEmpty()) {
+                    // buffer has been drained
+                    break;
+                }
+
+                if (null == dispatchRequest || requests.size() >= 
PREFERRED_DISPATCH_REQUEST_COUNT) {
+                    groupCommit();
+                }
+            }
+        }
+    }
+
+    private void groupCommit() {
+        while (!store.isStopped()) {
+            try {
+                // putMessagePosition will clear requests after consume queue 
building completion
+                store.putMessagePosition(requests);
+                break;
+            } catch (RocksDBException e) {
+                log.error("Failed to build consume queue in RocksDB", e);
+            }
+        }
+    }
+
+}
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java
 
b/store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java
index 66f5cbd095..2fac3bf485 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/rocksdb/RocksDBOptionsFactory.java
@@ -22,6 +22,7 @@ import org.rocksdb.BlockBasedTableConfig;
 import org.rocksdb.BloomFilter;
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.CompactionOptionsUniversal;
+import org.rocksdb.CompactionPriority;
 import org.rocksdb.CompactionStopStyle;
 import org.rocksdb.CompactionStyle;
 import org.rocksdb.CompressionType;
@@ -79,6 +80,7 @@ public class RocksDBOptionsFactory {
                 setCompressionType(compressionType).
                 setBottommostCompressionType(bottomMostCompressionType).
                 setNumLevels(7).
+                setCompactionPriority(CompactionPriority.MinOverlappingRatio).
                 setCompactionStyle(CompactionStyle.UNIVERSAL).
                 setCompactionOptionsUniversal(compactionOption).
                 setMaxCompactionBytes(100 * SizeUnit.GB).
@@ -144,10 +146,8 @@ public class RocksDBOptionsFactory {
                 setInfoLogLevel(InfoLogLevel.INFO_LEVEL).
                 setWalRecoveryMode(WALRecoveryMode.PointInTimeRecovery).
                 setManualWalFlush(true).
-                setMaxTotalWalSize(0).
-                setWalSizeLimitMB(0).
-                setWalTtlSeconds(0).
                 setCreateIfMissing(true).
+                setBytesPerSync(SizeUnit.MB).
                 setCreateMissingColumnFamilies(true).
                 setMaxOpenFiles(-1).
                 setMaxLogFileSize(SizeUnit.GB).
@@ -156,6 +156,7 @@ public class RocksDBOptionsFactory {
                 setAllowConcurrentMemtableWrite(false).
                 setStatistics(statistics).
                 setAtomicFlush(true).
+                setCompactionReadaheadSize(4 * SizeUnit.MB).
                 setMaxBackgroundJobs(32).
                 setMaxSubcompactions(8).
                 setParanoidChecks(true).

Reply via email to