This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new c8938e0c6c [ISSUE #8829] feat: provide ConfigManagerV2 to make best 
uses of RocksDB (#8856)
c8938e0c6c is described below

commit c8938e0c6c263cdb73b593ee9984841115604679
Author: Zhanhui Li <lizhan...@apache.org>
AuthorDate: Mon Oct 28 10:00:33 2024 +0800

    [ISSUE #8829] feat: provide ConfigManagerV2 to make best uses of RocksDB 
(#8856)
    
    * feat: provide ConfigManagerV2 to make best uses of RocksDB
    
    Signed-off-by: Li Zhanhui <lizhan...@gmail.com>
    
    * fix: release RocksDB objects using try-with-resource
    
    Signed-off-by: Li Zhanhui <lizhan...@gmail.com>
    
    ---------
    
    Signed-off-by: Li Zhanhui <lizhan...@gmail.com>
---
 .../apache/rocketmq/broker/BrokerController.java   |  39 +-
 .../v1}/RocksDBConsumerOffsetManager.java          |   3 +-
 .../v1}/RocksDBLmqConsumerOffsetManager.java       |   2 +-
 .../v1}/RocksDBLmqSubscriptionGroupManager.java    |   2 +-
 .../v1}/RocksDBLmqTopicConfigManager.java          |   2 +-
 .../v1}/RocksDBOffsetSerializeWrapper.java         |   2 +-
 .../v1}/RocksDBSubscriptionGroupManager.java       |   3 +-
 .../v1}/RocksDBTopicConfigManager.java             |   3 +-
 .../rocketmq/broker/config/v2/ConfigHelper.java    | 132 +++++++
 .../rocketmq/broker/config/v2/ConfigStorage.java   | 122 ++++++
 .../broker/config/v2/ConsumerOffsetManagerV2.java  | 426 +++++++++++++++++++++
 .../v2/RecordPrefix.java}                          |  21 +-
 .../v2/SerializationType.java}                     |  32 +-
 .../config/v2/SubscriptionGroupManagerV2.java      | 171 +++++++++
 .../v2/TableId.java}                               |  26 +-
 .../v2/TablePrefix.java}                           |  20 +-
 .../broker/config/v2/TopicConfigManagerV2.java     | 191 +++++++++
 .../v2/package-info.java}                          |  26 +-
 .../broker/offset/ConsumerOffsetManager.java       |   2 +-
 .../subscription/SubscriptionGroupManager.java     |   4 +-
 .../rocketmq/broker/topic/TopicConfigManager.java  |   4 +-
 .../offset/RocksDBConsumerOffsetManagerTest.java   |   1 +
 .../RocksDBLmqConsumerOffsetManagerTest.java       |   1 +
 .../offset/RocksDBOffsetSerializeWrapperTest.java  |   1 +
 .../offset/RocksdbTransferOffsetAndCqTest.java     |   1 +
 .../broker/processor/AdminBrokerProcessorTest.java |   4 +-
 .../RocksdbGroupConfigTransferTest.java            |   1 +
 .../topic/RocksdbTopicConfigManagerTest.java       |   1 +
 .../topic/RocksdbTopicConfigTransferTest.java      |   1 +
 .../org/apache/rocketmq/common/BrokerConfig.java   |  14 +
 .../common/config/ConfigManagerVersion.java        |  21 +-
 31 files changed, 1186 insertions(+), 93 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java 
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 05a00a5005..ee211e1b80 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -76,8 +76,8 @@ import 
org.apache.rocketmq.broker.offset.BroadcastOffsetManager;
 import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
 import org.apache.rocketmq.broker.offset.ConsumerOrderInfoManager;
 import org.apache.rocketmq.broker.offset.LmqConsumerOffsetManager;
-import org.apache.rocketmq.broker.offset.RocksDBConsumerOffsetManager;
-import org.apache.rocketmq.broker.offset.RocksDBLmqConsumerOffsetManager;
+import org.apache.rocketmq.broker.config.v1.RocksDBConsumerOffsetManager;
+import org.apache.rocketmq.broker.config.v1.RocksDBLmqConsumerOffsetManager;
 import org.apache.rocketmq.broker.out.BrokerOuterAPI;
 import org.apache.rocketmq.broker.plugin.BrokerAttachedPlugin;
 import org.apache.rocketmq.broker.processor.AckMessageProcessor;
@@ -99,12 +99,16 @@ import 
org.apache.rocketmq.broker.processor.SendMessageProcessor;
 import org.apache.rocketmq.broker.schedule.ScheduleMessageService;
 import org.apache.rocketmq.broker.slave.SlaveSynchronize;
 import org.apache.rocketmq.broker.subscription.LmqSubscriptionGroupManager;
-import 
org.apache.rocketmq.broker.subscription.RocksDBLmqSubscriptionGroupManager;
-import org.apache.rocketmq.broker.subscription.RocksDBSubscriptionGroupManager;
+import org.apache.rocketmq.broker.config.v1.RocksDBLmqSubscriptionGroupManager;
+import org.apache.rocketmq.broker.config.v1.RocksDBSubscriptionGroupManager;
 import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
+import org.apache.rocketmq.broker.config.v2.ConsumerOffsetManagerV2;
+import org.apache.rocketmq.broker.config.v2.SubscriptionGroupManagerV2;
+import org.apache.rocketmq.broker.config.v2.TopicConfigManagerV2;
+import org.apache.rocketmq.broker.config.v2.ConfigStorage;
 import org.apache.rocketmq.broker.topic.LmqTopicConfigManager;
-import org.apache.rocketmq.broker.topic.RocksDBLmqTopicConfigManager;
-import org.apache.rocketmq.broker.topic.RocksDBTopicConfigManager;
+import org.apache.rocketmq.broker.config.v1.RocksDBLmqTopicConfigManager;
+import org.apache.rocketmq.broker.config.v1.RocksDBTopicConfigManager;
 import org.apache.rocketmq.broker.topic.TopicConfigManager;
 import org.apache.rocketmq.broker.topic.TopicQueueMappingCleanService;
 import org.apache.rocketmq.broker.topic.TopicQueueMappingManager;
@@ -124,6 +128,7 @@ import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.config.ConfigManagerVersion;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.constant.PermName;
 import org.apache.rocketmq.common.message.MessageExt;
@@ -239,6 +244,11 @@ public class BrokerController {
     protected RemotingServer remotingServer;
     protected CountDownLatch remotingServerStartLatch;
     protected RemotingServer fastRemotingServer;
+
+    /**
+     * If {Topic, SubscriptionGroup, Offset}ManagerV2 are used, config entries 
are stored in RocksDB.
+     */
+    protected ConfigStorage configStorage;
     protected TopicConfigManager topicConfigManager;
     protected SubscriptionGroupManager subscriptionGroupManager;
     protected TopicQueueMappingManager topicQueueMappingManager;
@@ -334,7 +344,12 @@ public class BrokerController {
         this.setStoreHost(new 
InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), getListenPort()));
         this.brokerStatsManager = messageStoreConfig.isEnableLmq() ? new 
LmqBrokerStatsManager(this.brokerConfig.getBrokerClusterName(), 
this.brokerConfig.isEnableDetailStat()) : new 
BrokerStatsManager(this.brokerConfig.getBrokerClusterName(), 
this.brokerConfig.isEnableDetailStat());
         this.broadcastOffsetManager = new BroadcastOffsetManager(this);
-        if (this.messageStoreConfig.isEnableRocksDBStore()) {
+        if 
(ConfigManagerVersion.V2.getVersion().equals(brokerConfig.getConfigManagerVersion()))
 {
+            this.configStorage = new 
ConfigStorage(messageStoreConfig.getStorePathRootDir());
+            this.topicConfigManager = new TopicConfigManagerV2(this, 
configStorage);
+            this.subscriptionGroupManager = new 
SubscriptionGroupManagerV2(this, configStorage);
+            this.consumerOffsetManager = new ConsumerOffsetManagerV2(this, 
configStorage);
+        } else if (this.messageStoreConfig.isEnableRocksDBStore()) {
             this.topicConfigManager = messageStoreConfig.isEnableLmq() ? new 
RocksDBLmqTopicConfigManager(this) : new RocksDBTopicConfigManager(this);
             this.subscriptionGroupManager = messageStoreConfig.isEnableLmq() ? 
new RocksDBLmqSubscriptionGroupManager(this) : new 
RocksDBSubscriptionGroupManager(this);
             this.consumerOffsetManager = messageStoreConfig.isEnableLmq() ? 
new RocksDBLmqConsumerOffsetManager(this) : new 
RocksDBConsumerOffsetManager(this);
@@ -771,7 +786,11 @@ public class BrokerController {
     }
 
     public boolean initializeMetadata() {
-        boolean result = this.topicConfigManager.load();
+        boolean result = true;
+        if (null != configStorage) {
+            result = configStorage.start();
+        }
+        result = result && this.topicConfigManager.load();
         result = result && this.topicQueueMappingManager.load();
         result = result && this.consumerOffsetManager.load();
         result = result && this.subscriptionGroupManager.load();
@@ -1547,6 +1566,10 @@ public class BrokerController {
             this.consumerOffsetManager.stop();
         }
 
+        if (null != configStorage) {
+            configStorage.shutdown();
+        }
+
         if (this.authenticationMetadataManager != null) {
             this.authenticationMetadataManager.shutdown();
         }
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBConsumerOffsetManager.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBConsumerOffsetManager.java
similarity index 98%
rename from 
broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBConsumerOffsetManager.java
rename to 
broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBConsumerOffsetManager.java
index 1e7cda71ee..8066fe769a 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBConsumerOffsetManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBConsumerOffsetManager.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.broker.offset;
+package org.apache.rocketmq.broker.config.v1;
 
 import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.serializer.SerializerFeature;
@@ -24,6 +24,7 @@ import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentMap;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.RocksDBConfigManager;
+import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.utils.DataConverter;
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBLmqConsumerOffsetManager.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBLmqConsumerOffsetManager.java
similarity index 98%
rename from 
broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBLmqConsumerOffsetManager.java
rename to 
broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBLmqConsumerOffsetManager.java
index d0faa66140..e961c6c635 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBLmqConsumerOffsetManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBLmqConsumerOffsetManager.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.broker.offset;
+package org.apache.rocketmq.broker.config.v1;
 
 import java.util.HashMap;
 import java.util.Map;
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/subscription/RocksDBLmqSubscriptionGroupManager.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBLmqSubscriptionGroupManager.java
similarity index 97%
rename from 
broker/src/main/java/org/apache/rocketmq/broker/subscription/RocksDBLmqSubscriptionGroupManager.java
rename to 
broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBLmqSubscriptionGroupManager.java
index e4de25756b..05f3f7d2ec 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/subscription/RocksDBLmqSubscriptionGroupManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBLmqSubscriptionGroupManager.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.broker.subscription;
+package org.apache.rocketmq.broker.config.v1;
 
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.common.MixAll;
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/topic/RocksDBLmqTopicConfigManager.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBLmqTopicConfigManager.java
similarity index 97%
rename from 
broker/src/main/java/org/apache/rocketmq/broker/topic/RocksDBLmqTopicConfigManager.java
rename to 
broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBLmqTopicConfigManager.java
index d049a8dbcd..7b27801396 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/topic/RocksDBLmqTopicConfigManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBLmqTopicConfigManager.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.broker.topic;
+package org.apache.rocketmq.broker.config.v1;
 
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.common.MixAll;
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBOffsetSerializeWrapper.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBOffsetSerializeWrapper.java
similarity index 96%
copy from 
broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBOffsetSerializeWrapper.java
copy to 
broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBOffsetSerializeWrapper.java
index 7a90fd62fb..4801cfc681 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBOffsetSerializeWrapper.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBOffsetSerializeWrapper.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.broker.offset;
+package org.apache.rocketmq.broker.config.v1;
 
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/subscription/RocksDBSubscriptionGroupManager.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBSubscriptionGroupManager.java
similarity index 98%
rename from 
broker/src/main/java/org/apache/rocketmq/broker/subscription/RocksDBSubscriptionGroupManager.java
rename to 
broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBSubscriptionGroupManager.java
index 5119f78672..8175d63cce 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/subscription/RocksDBSubscriptionGroupManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBSubscriptionGroupManager.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.broker.subscription;
+package org.apache.rocketmq.broker.config.v1;
 
 import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONObject;
@@ -27,6 +27,7 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.function.BiConsumer;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.RocksDBConfigManager;
+import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.utils.DataConverter;
 import org.apache.rocketmq.remoting.protocol.DataVersion;
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/topic/RocksDBTopicConfigManager.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBTopicConfigManager.java
similarity index 98%
rename from 
broker/src/main/java/org/apache/rocketmq/broker/topic/RocksDBTopicConfigManager.java
rename to 
broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBTopicConfigManager.java
index 466e6416f9..bce67392f6 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/topic/RocksDBTopicConfigManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBTopicConfigManager.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.broker.topic;
+package org.apache.rocketmq.broker.config.v1;
 
 import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.serializer.SerializerFeature;
@@ -23,6 +23,7 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.RocksDBConfigManager;
+import org.apache.rocketmq.broker.topic.TopicConfigManager;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.utils.DataConverter;
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConfigHelper.java 
b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConfigHelper.java
new file mode 100644
index 0000000000..8183a1f835
--- /dev/null
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConfigHelper.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.config.v2;
+
+import com.alibaba.fastjson2.JSON;
+import com.google.common.base.Preconditions;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.nio.charset.StandardCharsets;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.rocketmq.common.config.AbstractRocksDBStorage;
+import org.apache.rocketmq.remoting.protocol.DataVersion;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+
+public class ConfigHelper {
+
+    /**
+     * <p>
+     * Layout of data version key:
+     * [table-prefix, 1 byte][table-id, 2 byte][record-prefix, 1 
byte][data-version-bytes]
+     * </p>
+     *
+     * <p>
+     * Layout of data version value:
+     * [state-machine-version, 8 bytes][timestamp, 8 bytes][sequence counter, 
8 bytes]
+     * </p>
+     *
+     * @throws RocksDBException if RocksDB raises an error
+     */
+    public static Optional<ByteBuf> loadDataVersion(ConfigStorage 
configStorage, TableId tableId)
+        throws RocksDBException {
+        int keyLen = 1 /* table-prefix */ + Short.BYTES /* table-id */ + 1 /* 
record-prefix */
+            + ConfigStorage.DATA_VERSION_KEY_BYTES.length;
+        ByteBuf keyBuf = 
AbstractRocksDBStorage.POOLED_ALLOCATOR.buffer(keyLen);
+        try {
+            keyBuf.writeByte(TablePrefix.TABLE.getValue());
+            keyBuf.writeShort(tableId.getValue());
+            keyBuf.writeByte(RecordPrefix.DATA_VERSION.getValue());
+            keyBuf.writeBytes(ConfigStorage.DATA_VERSION_KEY_BYTES);
+            byte[] valueByes = configStorage.get(keyBuf.nioBuffer());
+            if (null != valueByes) {
+                ByteBuf valueBuf = Unpooled.wrappedBuffer(valueByes);
+                return Optional.of(valueBuf);
+            }
+        } finally {
+            keyBuf.release();
+        }
+        return Optional.empty();
+    }
+
+    public static void stampDataVersion(WriteBatch writeBatch, DataVersion 
dataVersion, long stateMachineVersion)
+        throws RocksDBException {
+        // Increase data version
+        dataVersion.nextVersion(stateMachineVersion);
+
+        int keyLen = 1 /* table-prefix */ + Short.BYTES /* table-id */ + 1 /* 
record-prefix */
+            + ConfigStorage.DATA_VERSION_KEY_BYTES.length;
+        ByteBuf keyBuf = 
AbstractRocksDBStorage.POOLED_ALLOCATOR.buffer(keyLen);
+        ByteBuf valueBuf = 
AbstractRocksDBStorage.POOLED_ALLOCATOR.buffer(Long.BYTES * 3);
+        try {
+            keyBuf.writeByte(TablePrefix.TABLE.getValue());
+            keyBuf.writeShort(TableId.CONSUMER_OFFSET.getValue());
+            keyBuf.writeByte(RecordPrefix.DATA_VERSION.getValue());
+            keyBuf.writeBytes(ConfigStorage.DATA_VERSION_KEY_BYTES);
+            valueBuf.writeLong(dataVersion.getStateVersion());
+            valueBuf.writeLong(dataVersion.getTimestamp());
+            valueBuf.writeLong(dataVersion.getCounter().get());
+            writeBatch.put(keyBuf.nioBuffer(), valueBuf.nioBuffer());
+        } finally {
+            keyBuf.release();
+            valueBuf.release();
+        }
+    }
+
+    public static void onDataVersionLoad(ByteBuf buf, DataVersion dataVersion) 
{
+        if (buf.readableBytes() == 8 /* state machine version */ + 8 /* 
timestamp */ + 8 /* counter */) {
+            long stateMachineVersion = buf.readLong();
+            long timestamp = buf.readLong();
+            long counter = buf.readLong();
+            dataVersion.setStateVersion(stateMachineVersion);
+            dataVersion.setTimestamp(timestamp);
+            dataVersion.setCounter(new AtomicLong(counter));
+        }
+        buf.release();
+    }
+
+    public static ByteBuf keyBufOf(TableId tableId, final String name) {
+        Preconditions.checkNotNull(name);
+        byte[] bytes = name.getBytes(StandardCharsets.UTF_8);
+        int keyLen = 1 /* table-prefix */ + 2 /* table-id */ + 1 /* 
record-type-prefix */ + 2 /* name-length */ + bytes.length;
+        ByteBuf keyBuf = 
AbstractRocksDBStorage.POOLED_ALLOCATOR.buffer(keyLen);
+        keyBuf.writeByte(TablePrefix.TABLE.getValue());
+        keyBuf.writeShort(tableId.getValue());
+        keyBuf.writeByte(RecordPrefix.DATA.getValue());
+        keyBuf.writeShort(bytes.length);
+        keyBuf.writeBytes(bytes);
+        return keyBuf;
+    }
+
+    public static ByteBuf valueBufOf(final Object config, SerializationType 
serializationType) {
+        if (SerializationType.JSON == serializationType) {
+            byte[] payload = JSON.toJSONBytes(config);
+            ByteBuf valueBuf = 
AbstractRocksDBStorage.POOLED_ALLOCATOR.buffer(1 + payload.length);
+            valueBuf.writeByte(SerializationType.JSON.getValue());
+            valueBuf.writeBytes(payload);
+            return valueBuf;
+        }
+        throw new RuntimeException("Unsupported serialization type: " + 
serializationType);
+    }
+
+    public static byte[] readBytes(final ByteBuf buf) {
+        byte[] bytes = new byte[buf.readableBytes()];
+        buf.readBytes(bytes);
+        return bytes;
+    }
+}
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConfigStorage.java 
b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConfigStorage.java
new file mode 100644
index 0000000000..af259aaa37
--- /dev/null
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConfigStorage.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.config.v2;
+
+import io.netty.util.internal.PlatformDependent;
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.config.AbstractRocksDBStorage;
+import org.apache.rocketmq.common.config.ConfigHelper;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.DirectSlice;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+
+/**
+ * https://book.tidb.io/session1/chapter3/tidb-kv-to-relation.html
+ */
+public class ConfigStorage extends AbstractRocksDBStorage {
+
+    public static final String DATA_VERSION_KEY = "data_version";
+    public static final byte[] DATA_VERSION_KEY_BYTES = 
DATA_VERSION_KEY.getBytes(StandardCharsets.UTF_8);
+
+    public ConfigStorage(String storePath) {
+        super(storePath + File.separator + "config" + File.separator + "rdb");
+    }
+
+    @Override
+    protected boolean postLoad() {
+        if (!PlatformDependent.hasUnsafe()) {
+            LOGGER.error("Unsafe not available and POOLED_ALLOCATOR cannot 
work correctly");
+            return false;
+        }
+        try {
+            UtilAll.ensureDirOK(this.dbPath);
+            initOptions();
+            List<ColumnFamilyDescriptor> cfDescriptors = new ArrayList<>();
+
+            ColumnFamilyOptions defaultOptions = 
ConfigHelper.createConfigOptions();
+            this.cfOptions.add(defaultOptions);
+            cfDescriptors.add(new 
ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, defaultOptions));
+
+            // Start RocksDB instance
+            open(cfDescriptors);
+
+            this.defaultCFHandle = cfHandles.get(0);
+        } catch (final Exception e) {
+            AbstractRocksDBStorage.LOGGER.error("postLoad Failed. {}", 
this.dbPath, e);
+            return false;
+        }
+        return true;
+    }
+
+    @Override
+    protected void preShutdown() {
+
+    }
+
+    protected void initOptions() {
+        this.options = ConfigHelper.createConfigDBOptions();
+        super.initOptions();
+    }
+
+    @Override
+    protected void initAbleWalWriteOptions() {
+        this.ableWalWriteOptions = new WriteOptions();
+
+        // For metadata, prioritize data integrity
+        this.ableWalWriteOptions.setSync(true);
+
+        // We need WAL for config changes
+        this.ableWalWriteOptions.setDisableWAL(false);
+
+        // No fast failure on block, wait synchronously even if there is wait 
for the write request
+        this.ableWalWriteOptions.setNoSlowdown(false);
+    }
+
+    public byte[] get(ByteBuffer key) throws RocksDBException {
+        byte[] keyBytes = new byte[key.remaining()];
+        key.get(keyBytes);
+        return super.get(getDefaultCFHandle(), totalOrderReadOptions, 
keyBytes);
+    }
+
+    public void write(WriteBatch writeBatch) throws RocksDBException {
+        db.write(ableWalWriteOptions, writeBatch);
+    }
+
+    public RocksIterator iterate(ByteBuffer beginKey, ByteBuffer endKey) {
+        try (ReadOptions readOptions = new ReadOptions()) {
+            readOptions.setTotalOrderSeek(true);
+            readOptions.setTailing(false);
+            readOptions.setAutoPrefixMode(true);
+            readOptions.setIterateLowerBound(new DirectSlice(beginKey));
+            readOptions.setIterateUpperBound(new DirectSlice(endKey));
+            RocksIterator iterator = db.newIterator(defaultCFHandle, 
readOptions);
+            iterator.seekToFirst();
+            return iterator;
+        }
+    }
+}
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2.java
new file mode 100644
index 0000000000..5b0885c491
--- /dev/null
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2.java
@@ -0,0 +1,426 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.config.v2;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.util.internal.PlatformDependent;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.config.AbstractRocksDBStorage;
+import org.apache.rocketmq.store.MessageStore;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteBatch;
+
+/**
+ * <p>
+ * Layout of consumer offset key:
+ * [table-prefix, 1 byte][table-id, 2 bytes][record-prefix, 1 byte][group-len, 
2 bytes][group bytes][CTRL_1, 1 byte]
+ * [topic-len, 2 bytes][topic bytes][CTRL_1, 1 byte][queue-id, 4 bytes]
+ * </p>
+ *
+ * <p>
+ * Layout of consumer offset value: [offset, 8 bytes]
+ * </p>
+ */
+public class ConsumerOffsetManagerV2 extends ConsumerOffsetManager {
+
+    private final ConfigStorage configStorage;
+
+    public ConsumerOffsetManagerV2(BrokerController brokerController, 
ConfigStorage configStorage) {
+        super(brokerController);
+        this.configStorage = configStorage;
+    }
+
+    @Override
+    protected void removeConsumerOffset(String topicAtGroup) {
+        if (!MixAll.isLmq(topicAtGroup)) {
+            super.removeConsumerOffset(topicAtGroup);
+        }
+
+        String[] topicGroup = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);
+        if (topicGroup.length != 2) {
+            LOG.error("Invalid topic group: {}", topicAtGroup);
+            return;
+        }
+
+        byte[] topicBytes = topicGroup[0].getBytes(StandardCharsets.UTF_8);
+        byte[] groupBytes = topicGroup[1].getBytes(StandardCharsets.UTF_8);
+
+        int keyLen = 1 /* table-prefix */ + Short.BYTES /* table-id */ + 1 /* 
record-prefix */
+            + Short.BYTES /* group-len */ + groupBytes.length + 1 /* CTRL_1 */
+            + Short.BYTES + topicBytes.length + 1;
+        // [table-prefix, 1 byte][table-id, 2 bytes][record-prefix, 1 
byte][group-len, 2 bytes][group-bytes][CTRL_1, 1 byte]
+        // [topic-len, 2 bytes][topic-bytes][CTRL_1]
+        ByteBuf beginKey = 
AbstractRocksDBStorage.POOLED_ALLOCATOR.buffer(keyLen);
+        beginKey.writeByte(TablePrefix.TABLE.getValue());
+        beginKey.writeShort(TableId.CONSUMER_OFFSET.getValue());
+        beginKey.writeByte(RecordPrefix.DATA.getValue());
+        beginKey.writeShort(groupBytes.length);
+        beginKey.writeBytes(groupBytes);
+        beginKey.writeByte(AbstractRocksDBStorage.CTRL_1);
+        beginKey.writeShort(topicBytes.length);
+        beginKey.writeBytes(topicBytes);
+        beginKey.writeByte(AbstractRocksDBStorage.CTRL_1);
+
+        ByteBuf endKey = 
AbstractRocksDBStorage.POOLED_ALLOCATOR.buffer(keyLen);
+        endKey.writeByte(TablePrefix.TABLE.getValue());
+        endKey.writeShort(TableId.CONSUMER_OFFSET.getValue());
+        endKey.writeByte(RecordPrefix.DATA.getValue());
+        endKey.writeShort(groupBytes.length);
+        endKey.writeBytes(groupBytes);
+        endKey.writeByte(AbstractRocksDBStorage.CTRL_1);
+        endKey.writeShort(topicBytes.length);
+        endKey.writeBytes(topicBytes);
+        endKey.writeByte(AbstractRocksDBStorage.CTRL_2);
+
+        try (WriteBatch writeBatch = new WriteBatch()) {
+            // TODO: we have to make a copy here as WriteBatch lacks 
ByteBuffer API here
+            writeBatch.deleteRange(ConfigHelper.readBytes(beginKey), 
ConfigHelper.readBytes(endKey));
+            long stateMachineVersion = brokerController.getMessageStore() != 
null ? brokerController.getMessageStore().getStateMachineVersion() : 0;
+            ConfigHelper.stampDataVersion(writeBatch, dataVersion, 
stateMachineVersion);
+            configStorage.write(writeBatch);
+        } catch (RocksDBException e) {
+            LOG.error("Failed to removeConsumerOffset, topicAtGroup={}", 
topicAtGroup, e);
+        } finally {
+            beginKey.release();
+            endKey.release();
+        }
+    }
+
+    @Override
+    public void removeOffset(String group) {
+        if (!MixAll.isLmq(group)) {
+            super.removeOffset(group);
+        }
+
+        byte[] groupBytes = group.getBytes(StandardCharsets.UTF_8);
+        int keyLen = 1 /* table-prefix */ + Short.BYTES /* table-id */ + 1 /* 
record-prefix */
+            + Short.BYTES /* group-len */ + groupBytes.length + 1 /* CTRL_1 */;
+
+        // [table-prefix, 1 byte][table-id, 2 bytes][record-prefix, 1 
byte][group-len, 2 bytes][group bytes][CTRL_1, 1 byte]
+        ByteBuf beginKey = 
AbstractRocksDBStorage.POOLED_ALLOCATOR.buffer(keyLen);
+        beginKey.writeByte(TablePrefix.TABLE.getValue());
+        beginKey.writeShort(TableId.CONSUMER_OFFSET.getValue());
+        beginKey.writeByte(RecordPrefix.DATA.getValue());
+        beginKey.writeShort(groupBytes.length);
+        beginKey.writeBytes(groupBytes);
+        beginKey.writeByte(AbstractRocksDBStorage.CTRL_1);
+
+        ByteBuf endKey = 
AbstractRocksDBStorage.POOLED_ALLOCATOR.buffer(keyLen);
+        endKey.writeByte(TablePrefix.TABLE.getValue());
+        endKey.writeShort(TableId.CONSUMER_OFFSET.getValue());
+        endKey.writeByte(RecordPrefix.DATA.getValue());
+        endKey.writeShort(groupBytes.length);
+        endKey.writeBytes(groupBytes);
+        endKey.writeByte(AbstractRocksDBStorage.CTRL_2);
+        try (WriteBatch writeBatch = new WriteBatch()) {
+            // TODO: we have to make a copy here as WriteBatch lacks 
ByteBuffer API here
+            writeBatch.deleteRange(ConfigHelper.readBytes(beginKey), 
ConfigHelper.readBytes(endKey));
+            MessageStore messageStore = brokerController.getMessageStore();
+            long stateMachineVersion = messageStore != null ? 
messageStore.getStateMachineVersion() : 0;
+            ConfigHelper.stampDataVersion(writeBatch, dataVersion, 
stateMachineVersion);
+            configStorage.write(writeBatch);
+        } catch (RocksDBException e) {
+            LOG.error("Failed to consumer offsets by group={}", group, e);
+        } finally {
+            beginKey.release();
+            endKey.release();
+        }
+    }
+
+    /**
+     * <p>
+     * Layout of consumer offset key:
+     * [table-prefix, 1 byte][table-id, 2 bytes][record-prefix, 1 
byte][group-len, 2 bytes][group bytes][CTRL_1, 1 byte]
+     * [topic-len, 2 bytes][topic bytes][CTRL_1, 1 byte][queue-id, 4 bytes]
+     * </p>
+     *
+     * <p>
+     * Layout of consumer offset value:
+     * [offset, 8 bytes]
+     * </p>
+     *
+     * @param clientHost The client that submits consumer offsets
+     * @param group      Group name
+     * @param topic      Topic name
+     * @param queueId    Queue ID
+     * @param offset     Consumer offset of the specified queue
+     */
+    @Override
+    public void commitOffset(String clientHost, String group, String topic, 
int queueId, long offset) {
+        String key = topic + TOPIC_GROUP_SEPARATOR + group;
+
+        // We maintain a copy of classic consumer offset table in memory as 
they take very limited memory footprint.
+        // For LMQ offsets, given the volume and number of these type of 
records, they are maintained in RocksDB
+        // directly. Frequently used LMQ consumer offsets should reside either 
in block-cache or MemTable, so read/write
+        // should be blazingly fast.
+        if (!MixAll.isLmq(topic)) {
+            if (offsetTable.containsKey(key)) {
+                offsetTable.get(key).put(queueId, offset);
+            } else {
+                ConcurrentMap<Integer, Long> map = new ConcurrentHashMap<>();
+                ConcurrentMap<Integer, Long> prev = 
offsetTable.putIfAbsent(key, map);
+                if (null != prev) {
+                    map = prev;
+                }
+                map.put(queueId, offset);
+            }
+        }
+
+        ByteBuf keyBuf = keyOfConsumerOffset(group, topic, queueId);
+        ByteBuf valueBuf = ConfigStorage.POOLED_ALLOCATOR.buffer(Long.BYTES);
+        try (WriteBatch writeBatch = new WriteBatch()) {
+            valueBuf.writeLong(offset);
+            writeBatch.put(keyBuf.nioBuffer(), valueBuf.nioBuffer());
+            MessageStore messageStore = brokerController.getMessageStore();
+            long stateMachineVersion = messageStore != null ? 
messageStore.getStateMachineVersion() : 0;
+            ConfigHelper.stampDataVersion(writeBatch, dataVersion, 
stateMachineVersion);
+            configStorage.write(writeBatch);
+        } catch (RocksDBException e) {
+            LOG.error("Failed to commit consumer offset", e);
+        } finally {
+            keyBuf.release();
+            valueBuf.release();
+        }
+    }
+
+    private ByteBuf keyOfConsumerOffset(String group, String topic, int 
queueId) {
+        byte[] groupBytes = group.getBytes(StandardCharsets.UTF_8);
+        byte[] topicBytes = topic.getBytes(StandardCharsets.UTF_8);
+        int keyLen = 1 /*table prefix*/ + Short.BYTES /*table-id*/ + 1 
/*record-prefix*/
+            + Short.BYTES /*group-len*/ + groupBytes.length + 1 /*CTRL_1*/
+            + 2 /*topic-len*/ + topicBytes.length + 1 /* CTRL_1*/
+            + Integer.BYTES /*queue-id*/;
+        ByteBuf keyBuf = ConfigStorage.POOLED_ALLOCATOR.buffer(keyLen);
+        keyBuf.writeByte(TablePrefix.TABLE.getValue());
+        keyBuf.writeShort(TableId.CONSUMER_OFFSET.getValue());
+        keyBuf.writeByte(RecordPrefix.DATA.getValue());
+        keyBuf.writeShort(groupBytes.length);
+        keyBuf.writeBytes(groupBytes);
+        keyBuf.writeByte(AbstractRocksDBStorage.CTRL_1);
+        keyBuf.writeShort(topicBytes.length);
+        keyBuf.writeBytes(topicBytes);
+        keyBuf.writeByte(AbstractRocksDBStorage.CTRL_1);
+        keyBuf.writeInt(queueId);
+        return keyBuf;
+    }
+
+    private ByteBuf keyOfPullOffset(String group, String topic, int queueId) {
+        byte[] groupBytes = group.getBytes(StandardCharsets.UTF_8);
+        byte[] topicBytes = topic.getBytes(StandardCharsets.UTF_8);
+        int keyLen = 1 /*table prefix*/ + Short.BYTES /*table-id*/ + 1 
/*record-prefix*/
+            + Short.BYTES /*group-len*/ + groupBytes.length + 1 /*CTRL_1*/
+            + 2 /*topic-len*/ + topicBytes.length + 1 /* CTRL_1*/
+            + Integer.BYTES /*queue-id*/;
+        ByteBuf keyBuf = ConfigStorage.POOLED_ALLOCATOR.buffer(keyLen);
+        keyBuf.writeByte(TablePrefix.TABLE.getValue());
+        keyBuf.writeShort(TableId.PULL_OFFSET.getValue());
+        keyBuf.writeByte(RecordPrefix.DATA.getValue());
+        keyBuf.writeShort(groupBytes.length);
+        keyBuf.writeBytes(groupBytes);
+        keyBuf.writeByte(AbstractRocksDBStorage.CTRL_1);
+        keyBuf.writeShort(topicBytes.length);
+        keyBuf.writeBytes(topicBytes);
+        keyBuf.writeByte(AbstractRocksDBStorage.CTRL_1);
+        keyBuf.writeInt(queueId);
+        return keyBuf;
+    }
+
+    @Override
+    public boolean load() {
+        return loadDataVersion() && loadConsumerOffsets();
+    }
+
+    @Override
+    public synchronized void persist() {
+        try {
+            configStorage.flushWAL();
+        } catch (RocksDBException e) {
+            LOG.error("Failed to flush RocksDB config instance WAL", e);
+        }
+    }
+
+    /**
+     * <p>
+     * Layout of data version key:
+     * [table-prefix, 1 byte][table-id, 2 byte][record-prefix, 1 
byte][data-version-bytes]
+     * </p>
+     *
+     * <p>
+     * Layout of data version value:
+     * [state-machine-version, 8 bytes][timestamp, 8 bytes][sequence counter, 
8 bytes]
+     * </p>
+     */
+    public boolean loadDataVersion() {
+        try {
+            ConfigHelper.loadDataVersion(configStorage, 
TableId.CONSUMER_OFFSET)
+                .ifPresent(buf -> ConfigHelper.onDataVersionLoad(buf, 
dataVersion));
+        } catch (RocksDBException e) {
+            LOG.error("Failed to load RocksDB config", e);
+            return false;
+        }
+        return true;
+    }
+
+    private boolean loadConsumerOffsets() {
+        // [table-prefix, 1 byte][table-id, 2 bytes][record-prefix, 1 byte]
+        ByteBuf beginKeyBuf = 
AbstractRocksDBStorage.POOLED_ALLOCATOR.buffer(4);
+        beginKeyBuf.writeByte(TablePrefix.TABLE.getValue());
+        beginKeyBuf.writeShort(TableId.CONSUMER_OFFSET.getValue());
+        beginKeyBuf.writeByte(RecordPrefix.DATA.getValue());
+
+        ByteBuf endKeyBuf = AbstractRocksDBStorage.POOLED_ALLOCATOR.buffer(4);
+        endKeyBuf.writeByte(TablePrefix.TABLE.getValue());
+        endKeyBuf.writeShort(TableId.CONSUMER_OFFSET.getValue());
+        endKeyBuf.writeByte(RecordPrefix.DATA.getValue() + 1);
+
+        try (RocksIterator iterator = 
configStorage.iterate(beginKeyBuf.nioBuffer(), endKeyBuf.nioBuffer())) {
+            int keyCapacity = 256;
+            // We may iterate millions of LMQ consumer offsets here, use 
direct byte buffers here to avoid memory
+            // fragment
+            ByteBuffer keyBuffer = ByteBuffer.allocateDirect(keyCapacity);
+            ByteBuffer valueBuffer = ByteBuffer.allocateDirect(Long.BYTES);
+            while (iterator.isValid()) {
+                keyBuffer.clear();
+                valueBuffer.clear();
+
+                int len = iterator.key(keyBuffer);
+                if (len > keyCapacity) {
+                    keyCapacity = len;
+                    PlatformDependent.freeDirectBuffer(keyBuffer);
+                    // Reserve more space for key
+                    keyBuffer = ByteBuffer.allocateDirect(keyCapacity);
+                    continue;
+                }
+                len = iterator.value(valueBuffer);
+                assert len == Long.BYTES;
+
+                // skip table-prefix, table-id, record-prefix
+                keyBuffer.position(1 + 2 + 1);
+                short groupLen = keyBuffer.getShort();
+                byte[] groupBytes = new byte[groupLen];
+                keyBuffer.get(groupBytes);
+                byte ctrl = keyBuffer.get();
+                assert ctrl == AbstractRocksDBStorage.CTRL_1;
+
+                short topicLen = keyBuffer.getShort();
+                byte[] topicBytes = new byte[topicLen];
+                keyBuffer.get(topicBytes);
+                String topic = new String(topicBytes, StandardCharsets.UTF_8);
+                ctrl = keyBuffer.get();
+                assert ctrl == AbstractRocksDBStorage.CTRL_1;
+
+                int queueId = keyBuffer.getInt();
+
+                long offset = valueBuffer.getLong();
+
+                if (!MixAll.isLmq(topic)) {
+                    String group = new String(groupBytes, 
StandardCharsets.UTF_8);
+                    onConsumerOffsetRecordLoad(topic, group, queueId, offset);
+                }
+                iterator.next();
+            }
+            PlatformDependent.freeDirectBuffer(keyBuffer);
+            PlatformDependent.freeDirectBuffer(valueBuffer);
+        } finally {
+            beginKeyBuf.release();
+            endKeyBuf.release();
+        }
+        return true;
+    }
+
+    private void onConsumerOffsetRecordLoad(String topic, String group, int 
queueId, long offset) {
+        if (MixAll.isLmq(topic)) {
+            return;
+        }
+        String key = topic + TOPIC_GROUP_SEPARATOR + group;
+        if (!offsetTable.containsKey(key)) {
+            ConcurrentMap<Integer, Long> map = new ConcurrentHashMap<>();
+            offsetTable.putIfAbsent(key, map);
+        }
+        offsetTable.get(key).put(queueId, offset);
+    }
+
+    @Override
+    public long queryOffset(String group, String topic, int queueId) {
+        if (!MixAll.isLmq(topic)) {
+            return super.queryOffset(group, topic, queueId);
+        }
+
+        ByteBuf keyBuf = keyOfConsumerOffset(group, topic, queueId);
+        try {
+            byte[] slice = configStorage.get(keyBuf.nioBuffer());
+            if (null == slice) {
+                return -1;
+            }
+            assert slice.length == Long.BYTES;
+            return ByteBuffer.wrap(slice).getLong();
+        } catch (RocksDBException e) {
+            throw new RuntimeException(e);
+        } finally {
+            keyBuf.release();
+        }
+    }
+
+    @Override
+    public void commitPullOffset(String clientHost, String group, String 
topic, int queueId, long offset) {
+        if (!MixAll.isLmq(topic)) {
+            super.commitPullOffset(clientHost, group, topic, queueId, offset);
+        }
+
+        ByteBuf keyBuf = keyOfPullOffset(group, topic, queueId);
+        ByteBuf valueBuf = AbstractRocksDBStorage.POOLED_ALLOCATOR.buffer(8);
+        try (WriteBatch writeBatch = new WriteBatch()) {
+            writeBatch.put(keyBuf.nioBuffer(), valueBuf.nioBuffer());
+            long stateMachineVersion = brokerController.getMessageStore() != 
null ? brokerController.getMessageStore().getStateMachineVersion() : 0;
+            ConfigHelper.stampDataVersion(writeBatch, dataVersion, 
stateMachineVersion);
+        } catch (RocksDBException e) {
+            LOG.error("Failed to commit pull offset. group={}, topic={}, 
queueId={}, offset={}",
+                group, topic, queueId, offset);
+        } finally {
+            keyBuf.release();
+            valueBuf.release();
+        }
+    }
+
+    @Override
+    public long queryPullOffset(String group, String topic, int queueId) {
+        if (!MixAll.isLmq(topic)) {
+            return super.queryPullOffset(group, topic, queueId);
+        }
+
+        ByteBuf keyBuf = keyOfPullOffset(group, topic, queueId);
+        try {
+            byte[] valueBytes = configStorage.get(keyBuf.nioBuffer());
+            if (null == valueBytes) {
+                return -1;
+            }
+            return ByteBuffer.wrap(valueBytes).getLong();
+        } catch (RocksDBException e) {
+            LOG.error("Failed to queryPullOffset. group={}, topic={}, 
queueId={}", group, topic, queueId);
+        } finally {
+            keyBuf.release();
+        }
+        return -1;
+    }
+}
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBOffsetSerializeWrapper.java
 b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/RecordPrefix.java
similarity index 59%
copy from 
broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBOffsetSerializeWrapper.java
copy to 
broker/src/main/java/org/apache/rocketmq/broker/config/v2/RecordPrefix.java
index 7a90fd62fb..750d454d4e 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBOffsetSerializeWrapper.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/RecordPrefix.java
@@ -14,21 +14,20 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.broker.offset;
+package org.apache.rocketmq.broker.config.v2;
 
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
+public enum RecordPrefix {
+    UNSPECIFIED((byte)0),
+    DATA_VERSION((byte)1),
+    DATA((byte)2);
 
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+    private final byte value;
 
-public class RocksDBOffsetSerializeWrapper extends RemotingSerializable {
-    private ConcurrentMap<Integer, Long> offsetTable = new 
ConcurrentHashMap(16);
-
-    public ConcurrentMap<Integer, Long> getOffsetTable() {
-        return offsetTable;
+    RecordPrefix(byte value) {
+        this.value = value;
     }
 
-    public void setOffsetTable(ConcurrentMap<Integer, Long> offsetTable) {
-        this.offsetTable = offsetTable;
+    public byte getValue() {
+        return value;
     }
 }
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBOffsetSerializeWrapper.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/SerializationType.java
similarity index 57%
copy from 
broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBOffsetSerializeWrapper.java
copy to 
broker/src/main/java/org/apache/rocketmq/broker/config/v2/SerializationType.java
index 7a90fd62fb..2ee157fdc8 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBOffsetSerializeWrapper.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/SerializationType.java
@@ -14,21 +14,33 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.broker.offset;
+package org.apache.rocketmq.broker.config.v2;
 
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
+public enum SerializationType {
+    UNSPECIFIED((byte) 0),
 
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+    JSON((byte) 1),
 
-public class RocksDBOffsetSerializeWrapper extends RemotingSerializable {
-    private ConcurrentMap<Integer, Long> offsetTable = new 
ConcurrentHashMap(16);
+    PROTOBUF((byte) 2),
 
-    public ConcurrentMap<Integer, Long> getOffsetTable() {
-        return offsetTable;
+    FLAT_BUFFERS((byte) 3);
+
+    private final byte value;
+
+    SerializationType(byte value) {
+        this.value = value;
+    }
+
+    public byte getValue() {
+        return value;
     }
 
-    public void setOffsetTable(ConcurrentMap<Integer, Long> offsetTable) {
-        this.offsetTable = offsetTable;
+    public static SerializationType valueOf(byte value) {
+        for (SerializationType type : SerializationType.values()) {
+            if (type.getValue() == value) {
+                return type;
+            }
+        }
+        return SerializationType.UNSPECIFIED;
     }
 }
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/config/v2/SubscriptionGroupManagerV2.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/SubscriptionGroupManagerV2.java
new file mode 100644
index 0000000000..8da6f9d2bc
--- /dev/null
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/SubscriptionGroupManagerV2.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.config.v2;
+
+import com.alibaba.fastjson2.JSON;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.nio.charset.StandardCharsets;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.config.AbstractRocksDBStorage;
+import 
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteBatch;
+
+public class SubscriptionGroupManagerV2 extends SubscriptionGroupManager {
+
+    private final ConfigStorage configStorage;
+
+    public SubscriptionGroupManagerV2(BrokerController brokerController, 
ConfigStorage configStorage) {
+        super(brokerController);
+        this.configStorage = configStorage;
+    }
+
+    @Override
+    public boolean load() {
+        return loadDataVersion() && loadSubscriptions();
+    }
+
+    public boolean loadDataVersion() {
+        try {
+            ConfigHelper.loadDataVersion(configStorage, 
TableId.SUBSCRIPTION_GROUP)
+                .ifPresent(buf -> {
+                    ConfigHelper.onDataVersionLoad(buf, dataVersion);
+                });
+        } catch (RocksDBException e) {
+            log.error("loadDataVersion error", e);
+            return false;
+        }
+        return true;
+    }
+
+    private boolean loadSubscriptions() {
+        int keyLen = 1 /* table prefix */ + 2 /* table-id */ + 1 /* 
record-type-prefix */;
+        ByteBuf beginKey = 
AbstractRocksDBStorage.POOLED_ALLOCATOR.buffer(keyLen);
+        beginKey.writeByte(TablePrefix.TABLE.getValue());
+        beginKey.writeShort(TableId.SUBSCRIPTION_GROUP.getValue());
+        beginKey.writeByte(RecordPrefix.DATA.getValue());
+
+        ByteBuf endKey = 
AbstractRocksDBStorage.POOLED_ALLOCATOR.buffer(keyLen);
+        endKey.writeByte(TablePrefix.TABLE.getValue());
+        endKey.writeShort(TableId.SUBSCRIPTION_GROUP.getValue());
+        endKey.writeByte(RecordPrefix.DATA.getValue() + 1);
+
+        try (RocksIterator iterator = 
configStorage.iterate(beginKey.nioBuffer(), endKey.nioBuffer())) {
+            while (iterator.isValid()) {
+                SubscriptionGroupConfig subscriptionGroupConfig = 
parseSubscription(iterator.key(), iterator.value());
+                if (null != subscriptionGroupConfig) {
+                    
super.updateSubscriptionGroupConfigWithoutPersist(subscriptionGroupConfig);
+                }
+            }
+        } finally {
+            beginKey.release();
+            endKey.release();
+        }
+        return true;
+    }
+
+    private SubscriptionGroupConfig parseSubscription(byte[] key, byte[] 
value) {
+        ByteBuf keyBuf = Unpooled.wrappedBuffer(key);
+        ByteBuf valueBuf = Unpooled.wrappedBuffer(value);
+        try {
+            // Skip table-prefix, table-id, record-type-prefix
+            keyBuf.readerIndex(4);
+            short groupNameLen = keyBuf.readShort();
+            assert groupNameLen == keyBuf.readableBytes();
+            CharSequence groupName = keyBuf.readCharSequence(groupNameLen, 
StandardCharsets.UTF_8);
+            assert null != groupName;
+            byte serializationType = valueBuf.readByte();
+            if (SerializationType.JSON == 
SerializationType.valueOf(serializationType)) {
+                CharSequence json = 
valueBuf.readCharSequence(valueBuf.readableBytes(), StandardCharsets.UTF_8);
+                SubscriptionGroupConfig subscriptionGroupConfig = 
JSON.parseObject(json.toString(), SubscriptionGroupConfig.class);
+                assert subscriptionGroupConfig != null;
+                assert 
groupName.equals(subscriptionGroupConfig.getGroupName());
+                return subscriptionGroupConfig;
+            }
+        } finally {
+            keyBuf.release();
+            valueBuf.release();
+        }
+        return null;
+    }
+
+    @Override
+    public synchronized void persist() {
+        try {
+            configStorage.flushWAL();
+        } catch (RocksDBException e) {
+            log.error("Failed to flush RocksDB WAL", e);
+        }
+    }
+
+    @Override
+    public SubscriptionGroupConfig findSubscriptionGroupConfig(final String 
group) {
+        if (MixAll.isLmq(group)) {
+            SubscriptionGroupConfig subscriptionGroupConfig = new 
SubscriptionGroupConfig();
+            subscriptionGroupConfig.setGroupName(group);
+            return subscriptionGroupConfig;
+        }
+        return super.findSubscriptionGroupConfig(group);
+    }
+
+    @Override
+    public void updateSubscriptionGroupConfig(final SubscriptionGroupConfig 
config) {
+        if (config == null || MixAll.isLmq(config.getGroupName())) {
+            return;
+        }
+        ByteBuf keyBuf = ConfigHelper.keyBufOf(TableId.SUBSCRIPTION_GROUP, 
config.getGroupName());
+        ByteBuf valueBuf = ConfigHelper.valueBufOf(config, 
SerializationType.JSON);
+        try (WriteBatch writeBatch = new WriteBatch()) {
+            writeBatch.put(keyBuf.nioBuffer(), valueBuf.nioBuffer());
+            long stateMachineVersion = brokerController.getMessageStore() != 
null ? brokerController.getMessageStore().getStateMachineVersion() : 0;
+            ConfigHelper.stampDataVersion(writeBatch, dataVersion, 
stateMachineVersion);
+            configStorage.write(writeBatch);
+        } catch (RocksDBException e) {
+            log.error("update subscription group config error", e);
+        } finally {
+            keyBuf.release();
+            valueBuf.release();
+        }
+        super.updateSubscriptionGroupConfigWithoutPersist(config);
+    }
+
+    @Override
+    public boolean containsSubscriptionGroup(String group) {
+        if (MixAll.isLmq(group)) {
+            return true;
+        } else {
+            return super.containsSubscriptionGroup(group);
+        }
+    }
+
+    @Override
+    protected SubscriptionGroupConfig removeSubscriptionGroupConfig(String 
groupName) {
+        ByteBuf keyBuf = ConfigHelper.keyBufOf(TableId.SUBSCRIPTION_GROUP, 
groupName);
+        try (WriteBatch writeBatch = new WriteBatch()) {
+            writeBatch.delete(ConfigHelper.readBytes(keyBuf));
+            long stateMachineVersion = 
brokerController.getMessageStore().getStateMachineVersion();
+            ConfigHelper.stampDataVersion(writeBatch, dataVersion, 
stateMachineVersion);
+        } catch (RocksDBException e) {
+            log.error("Failed to remove subscription group config by 
group-name={}", groupName, e);
+        }
+        return super.removeSubscriptionGroupConfig(groupName);
+    }
+}
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBOffsetSerializeWrapper.java
 b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/TableId.java
similarity index 59%
copy from 
broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBOffsetSerializeWrapper.java
copy to broker/src/main/java/org/apache/rocketmq/broker/config/v2/TableId.java
index 7a90fd62fb..7a61899371 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBOffsetSerializeWrapper.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/TableId.java
@@ -14,21 +14,25 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.broker.offset;
+package org.apache.rocketmq.broker.config.v2;
 
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+/**
+ * See <a 
href="https://book.tidb.io/session1/chapter3/tidb-kv-to-relation.html";>Table, 
Key Value Mapping</a>
+ */
+public enum TableId {
+    UNSPECIFIED((short) 0),
+    CONSUMER_OFFSET((short) 1),
+    PULL_OFFSET((short) 2),
+    TOPIC((short) 3),
+    SUBSCRIPTION_GROUP((short) 4);
 
-public class RocksDBOffsetSerializeWrapper extends RemotingSerializable {
-    private ConcurrentMap<Integer, Long> offsetTable = new 
ConcurrentHashMap(16);
+    private final short value;
 
-    public ConcurrentMap<Integer, Long> getOffsetTable() {
-        return offsetTable;
+    TableId(short value) {
+        this.value = value;
     }
 
-    public void setOffsetTable(ConcurrentMap<Integer, Long> offsetTable) {
-        this.offsetTable = offsetTable;
+    public short getValue() {
+        return value;
     }
 }
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBOffsetSerializeWrapper.java
 b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/TablePrefix.java
similarity index 59%
copy from 
broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBOffsetSerializeWrapper.java
copy to 
broker/src/main/java/org/apache/rocketmq/broker/config/v2/TablePrefix.java
index 7a90fd62fb..d16c14d275 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBOffsetSerializeWrapper.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/TablePrefix.java
@@ -14,21 +14,19 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.broker.offset;
+package org.apache.rocketmq.broker.config.v2;
 
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
+public enum TablePrefix {
+    UNSPECIFIED((byte) 0),
+    TABLE((byte) 1);
 
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+    private final byte value;
 
-public class RocksDBOffsetSerializeWrapper extends RemotingSerializable {
-    private ConcurrentMap<Integer, Long> offsetTable = new 
ConcurrentHashMap(16);
-
-    public ConcurrentMap<Integer, Long> getOffsetTable() {
-        return offsetTable;
+    TablePrefix(byte value) {
+        this.value = value;
     }
 
-    public void setOffsetTable(ConcurrentMap<Integer, Long> offsetTable) {
-        this.offsetTable = offsetTable;
+    public byte getValue() {
+        return value;
     }
 }
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/config/v2/TopicConfigManagerV2.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/TopicConfigManagerV2.java
new file mode 100644
index 0000000000..b1a3d2d85c
--- /dev/null
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/TopicConfigManagerV2.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.config.v2;
+
+import com.alibaba.fastjson2.JSON;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.nio.charset.StandardCharsets;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.topic.TopicConfigManager;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.config.AbstractRocksDBStorage;
+import org.apache.rocketmq.common.constant.PermName;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteBatch;
+
+/**
+ * Key layout: [table-prefix, 1 byte][table-id, 2 bytes][record-type-prefix, 1 
byte][topic-len, 2 bytes][topic-bytes]
+ * Value layout: [serialization-type, 1 byte][topic-config-bytes]
+ */
+public class TopicConfigManagerV2 extends TopicConfigManager {
+    private final ConfigStorage configStorage;
+
+    public TopicConfigManagerV2(BrokerController brokerController, 
ConfigStorage configStorage) {
+        super(brokerController);
+        this.configStorage = configStorage;
+    }
+
+    @Override
+    public boolean load() {
+        return loadDataVersion() && loadTopicConfig();
+    }
+
+    public boolean loadDataVersion() {
+        try {
+            ConfigHelper.loadDataVersion(configStorage, TableId.TOPIC)
+                .ifPresent(buf -> ConfigHelper.onDataVersionLoad(buf, 
dataVersion));
+        } catch (RocksDBException e) {
+            log.error("Failed to load data version of topic", e);
+            return false;
+        }
+        return true;
+    }
+
+    private boolean loadTopicConfig() {
+        int keyLen = 1 /* table-prefix */ + 2 /* table-id */ + 1 /* 
record-type-prefix */;
+        ByteBuf beginKey = 
AbstractRocksDBStorage.POOLED_ALLOCATOR.buffer(keyLen);
+        beginKey.writeByte(TablePrefix.TABLE.getValue());
+        beginKey.writeShort(TableId.TOPIC.getValue());
+        beginKey.writeByte(RecordPrefix.DATA.getValue());
+
+        ByteBuf endKey = 
AbstractRocksDBStorage.POOLED_ALLOCATOR.buffer(keyLen);
+        endKey.writeByte(TablePrefix.TABLE.getValue());
+        endKey.writeShort(TableId.TOPIC.getValue());
+        endKey.writeByte(RecordPrefix.DATA.getValue() + 1);
+
+        try (RocksIterator iterator = 
configStorage.iterate(beginKey.nioBuffer(), endKey.nioBuffer())) {
+            while (iterator.isValid()) {
+                byte[] key = iterator.key();
+                byte[] value = iterator.value();
+                TopicConfig topicConfig = parseTopicConfig(key, value);
+                if (null != topicConfig) {
+                    super.updateSingleTopicConfigWithoutPersist(topicConfig);
+                }
+                iterator.next();
+            }
+        } finally {
+            beginKey.release();
+            endKey.release();
+        }
+        return true;
+    }
+
+    /**
+     * Key layout: [table-prefix, 1 byte][table-id, 2 
bytes][record-type-prefix, 1 byte][topic-len, 2 bytes][topic-bytes]
+     * Value layout: [serialization-type, 1 byte][topic-config-bytes]
+     *
+     * @param key   Topic config key representation in RocksDB
+     * @param value Topic config value representation in RocksDB
+     * @return decoded topic config
+     */
+    private TopicConfig parseTopicConfig(byte[] key, byte[] value) {
+        ByteBuf keyBuf = Unpooled.wrappedBuffer(key);
+        ByteBuf valueBuf = Unpooled.wrappedBuffer(value);
+        try {
+            // Skip table-prefix, table-id, record-type-prefix
+            keyBuf.readerIndex(4);
+            short topicLen = keyBuf.readShort();
+            assert topicLen == keyBuf.readableBytes();
+            CharSequence topic = keyBuf.readCharSequence(topicLen, 
StandardCharsets.UTF_8);
+            assert null != topic;
+
+            byte serializationType = valueBuf.readByte();
+            if (SerializationType.JSON == 
SerializationType.valueOf(serializationType)) {
+                CharSequence json = 
valueBuf.readCharSequence(valueBuf.readableBytes(), StandardCharsets.UTF_8);
+                TopicConfig topicConfig = JSON.parseObject(json.toString(), 
TopicConfig.class);
+                assert topicConfig != null;
+                assert topic.equals(topicConfig.getTopicName());
+                return topicConfig;
+            }
+        } finally {
+            keyBuf.release();
+            valueBuf.release();
+        }
+
+        return null;
+    }
+
+    @Override
+    public synchronized void persist() {
+        try {
+            configStorage.flushWAL();
+        } catch (RocksDBException e) {
+            log.error("Failed to flush WAL", e);
+        }
+    }
+
+    @Override
+    public TopicConfig selectTopicConfig(final String topic) {
+        if (MixAll.isLmq(topic)) {
+            return simpleLmqTopicConfig(topic);
+        }
+        return super.selectTopicConfig(topic);
+    }
+
+    @Override
+    public void updateTopicConfig(final TopicConfig topicConfig) {
+        if (topicConfig == null || MixAll.isLmq(topicConfig.getTopicName())) {
+            return;
+        }
+        super.updateSingleTopicConfigWithoutPersist(topicConfig);
+
+        ByteBuf keyBuf = ConfigHelper.keyBufOf(TableId.TOPIC, 
topicConfig.getTopicName());
+        ByteBuf valueBuf = ConfigHelper.valueBufOf(topicConfig, 
SerializationType.JSON);
+        try (WriteBatch writeBatch = new WriteBatch()) {
+            writeBatch.put(keyBuf.nioBuffer(), valueBuf.nioBuffer());
+            long stateMachineVersion = brokerController.getMessageStore() != 
null ? brokerController.getMessageStore().getStateMachineVersion() : 0;
+            ConfigHelper.stampDataVersion(writeBatch, dataVersion, 
stateMachineVersion);
+            configStorage.write(writeBatch);
+        } catch (RocksDBException e) {
+            log.error("Failed to update topic config", e);
+        } finally {
+            keyBuf.release();
+            valueBuf.release();
+        }
+    }
+
+    @Override
+    protected TopicConfig removeTopicConfig(String topicName) {
+        ByteBuf keyBuf = ConfigHelper.keyBufOf(TableId.TOPIC, topicName);
+        try (WriteBatch writeBatch = new WriteBatch()) {
+            writeBatch.delete(keyBuf.nioBuffer());
+            long stateMachineVersion = brokerController.getMessageStore() != 
null ? brokerController.getMessageStore().getStateMachineVersion() : 0;
+            ConfigHelper.stampDataVersion(writeBatch, dataVersion, 
stateMachineVersion);
+            configStorage.write(writeBatch);
+        } catch (RocksDBException e) {
+            log.error("Failed to delete topic config by topicName={}", 
topicName, e);
+        } finally {
+            keyBuf.release();
+        }
+        return super.removeTopicConfig(topicName);
+    }
+
+    @Override
+    public boolean containsTopic(String topic) {
+        if (MixAll.isLmq(topic)) {
+            return true;
+        }
+        return super.containsTopic(topic);
+    }
+
+    private TopicConfig simpleLmqTopicConfig(String topic) {
+        return new TopicConfig(topic, 1, 1, PermName.PERM_READ | 
PermName.PERM_WRITE);
+    }
+}
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBOffsetSerializeWrapper.java
 b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/package-info.java
similarity index 58%
copy from 
broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBOffsetSerializeWrapper.java
copy to 
broker/src/main/java/org/apache/rocketmq/broker/config/v2/package-info.java
index 7a90fd62fb..1ea216193c 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBOffsetSerializeWrapper.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/package-info.java
@@ -14,21 +14,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.broker.offset;
+package org.apache.rocketmq.broker.config.v2;
 
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
-
-public class RocksDBOffsetSerializeWrapper extends RemotingSerializable {
-    private ConcurrentMap<Integer, Long> offsetTable = new 
ConcurrentHashMap(16);
-
-    public ConcurrentMap<Integer, Long> getOffsetTable() {
-        return offsetTable;
-    }
-
-    public void setOffsetTable(ConcurrentMap<Integer, Long> offsetTable) {
-        this.offsetTable = offsetTable;
-    }
-}
+/*
+ * <strong>Endian</strong>: we use network byte order for all integrals, aka, 
always big endian.
+ *
+ * Unlike v1 config managers, implementations in this package prioritize data 
integrity and reliability.
+ * As a result,RocksDB write-ahead-log is always on and changes are 
immediately flushed. Another significant
+ * difference is that heap-based cache is removed because it is not necessary 
and duplicated to RocksDB
+ * MemTable/BlockCache.
+ */
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
index 403324137c..ea46f1d8a1 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
@@ -43,7 +43,7 @@ public class ConsumerOffsetManager extends ConfigManager {
     protected static final Logger LOG = 
LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
     public static final String TOPIC_GROUP_SEPARATOR = "@";
 
-    private DataVersion dataVersion = new DataVersion();
+    protected DataVersion dataVersion = new DataVersion();
 
     protected ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, 
Long>> offsetTable =
         new ConcurrentHashMap<>(512);
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
index e6855ef9a2..f62a3e4a09 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
@@ -49,7 +49,7 @@ public class SubscriptionGroupManager extends ConfigManager {
     private ConcurrentMap<String, ConcurrentMap<String, Integer>> 
forbiddenTable =
         new ConcurrentHashMap<>(4);
 
-    private final DataVersion dataVersion = new DataVersion();
+    protected final DataVersion dataVersion = new DataVersion();
     protected transient BrokerController brokerController;
 
     public SubscriptionGroupManager() {
@@ -143,7 +143,7 @@ public class SubscriptionGroupManager extends ConfigManager 
{
         this.persist();
     }
 
-    private void 
updateSubscriptionGroupConfigWithoutPersist(SubscriptionGroupConfig config) {
+    protected void 
updateSubscriptionGroupConfigWithoutPersist(SubscriptionGroupConfig config) {
         Map<String, String> newAttributes = request(config);
         Map<String, String> currentAttributes = current(config.getGroupName());
 
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java 
b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
index 25d3218f2a..4530c10002 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
@@ -66,7 +66,7 @@ public class TopicConfigManager extends ConfigManager {
 
     private transient final Lock topicConfigTableLock = new ReentrantLock();
     protected ConcurrentMap<String, TopicConfig> topicConfigTable = new 
ConcurrentHashMap<>(1024);
-    private DataVersion dataVersion = new DataVersion();
+    protected DataVersion dataVersion = new DataVersion();
     protected transient BrokerController brokerController;
 
     public TopicConfigManager() {
@@ -497,7 +497,7 @@ public class TopicConfigManager extends ConfigManager {
         }
     }
 
-    private void updateSingleTopicConfigWithoutPersist(final TopicConfig 
topicConfig) {
+    protected void updateSingleTopicConfigWithoutPersist(final TopicConfig 
topicConfig) {
         checkNotNull(topicConfig, "topicConfig shouldn't be null");
 
         Map<String, String> newAttributes = request(topicConfig);
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksDBConsumerOffsetManagerTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksDBConsumerOffsetManagerTest.java
index 58b690c9a3..5a7a2c38ac 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksDBConsumerOffsetManagerTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksDBConsumerOffsetManagerTest.java
@@ -21,6 +21,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
 import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.config.v1.RocksDBConsumerOffsetManager;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
 import org.junit.After;
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksDBLmqConsumerOffsetManagerTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksDBLmqConsumerOffsetManagerTest.java
index ea6528546d..1b9916d6ac 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksDBLmqConsumerOffsetManagerTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksDBLmqConsumerOffsetManagerTest.java
@@ -18,6 +18,7 @@
 package org.apache.rocketmq.broker.offset;
 
 import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.config.v1.RocksDBLmqConsumerOffsetManager;
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksDBOffsetSerializeWrapperTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksDBOffsetSerializeWrapperTest.java
index dde0401e8a..c01e63f31f 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksDBOffsetSerializeWrapperTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksDBOffsetSerializeWrapperTest.java
@@ -21,6 +21,7 @@ package org.apache.rocketmq.broker.offset;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
+import org.apache.rocketmq.broker.config.v1.RocksDBOffsetSerializeWrapper;
 import org.junit.Before;
 import org.junit.Test;
 
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksdbTransferOffsetAndCqTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksdbTransferOffsetAndCqTest.java
index b4800aec24..64c505eb77 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksdbTransferOffsetAndCqTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksdbTransferOffsetAndCqTest.java
@@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import org.apache.commons.collections.MapUtils;
 import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.config.v1.RocksDBConsumerOffsetManager;
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.Pair;
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
index 04324043fb..d87f513355 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
@@ -36,8 +36,8 @@ import org.apache.rocketmq.broker.client.ConsumerManager;
 import org.apache.rocketmq.broker.client.net.Broker2Client;
 import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
 import org.apache.rocketmq.broker.schedule.ScheduleMessageService;
-import org.apache.rocketmq.broker.subscription.RocksDBSubscriptionGroupManager;
-import org.apache.rocketmq.broker.topic.RocksDBTopicConfigManager;
+import org.apache.rocketmq.broker.config.v1.RocksDBSubscriptionGroupManager;
+import org.apache.rocketmq.broker.config.v1.RocksDBTopicConfigManager;
 import org.apache.rocketmq.broker.topic.TopicConfigManager;
 import org.apache.rocketmq.common.BoundaryType;
 import org.apache.rocketmq.common.BrokerConfig;
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/subscription/RocksdbGroupConfigTransferTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/subscription/RocksdbGroupConfigTransferTest.java
index 205e642843..26017af8a6 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/subscription/RocksdbGroupConfigTransferTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/subscription/RocksdbGroupConfigTransferTest.java
@@ -18,6 +18,7 @@
 package org.apache.rocketmq.broker.subscription;
 
 import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.config.v1.RocksDBSubscriptionGroupManager;
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.remoting.protocol.DataVersion;
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigManagerTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigManagerTest.java
index b0e0d05736..080e1dd5a3 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigManagerTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigManagerTest.java
@@ -24,6 +24,7 @@ import java.util.Optional;
 import java.util.UUID;
 
 import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.config.v1.RocksDBTopicConfigManager;
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.TopicAttributes;
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigTransferTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigTransferTest.java
index 2a72709098..fb345548e4 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigTransferTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigTransferTest.java
@@ -18,6 +18,7 @@
 package org.apache.rocketmq.broker.topic;
 
 import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.config.v1.RocksDBTopicConfigManager;
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.TopicConfig;
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java 
b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index 2acfdd69a5..c651047661 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -17,6 +17,7 @@
 package org.apache.rocketmq.common;
 
 import org.apache.rocketmq.common.annotation.ImportantField;
+import org.apache.rocketmq.common.config.ConfigManagerVersion;
 import org.apache.rocketmq.common.constant.PermName;
 import org.apache.rocketmq.common.message.MessageRequestMode;
 import org.apache.rocketmq.common.metrics.MetricsExporterType;
@@ -431,6 +432,11 @@ public class BrokerConfig extends BrokerIdentity {
 
     private boolean appendCkAsync = false;
 
+    /**
+     * V2 is recommended in cases where LMQ feature is extensively used.
+     */
+    private String configManagerVersion = ConfigManagerVersion.V1.getVersion();
+
     public String getConfigBlackList() {
         return configBlackList;
     }
@@ -1879,4 +1885,12 @@ public class BrokerConfig extends BrokerIdentity {
     public void setAppendCkAsync(boolean appendCkAsync) {
         this.appendCkAsync = appendCkAsync;
     }
+
+    public String getConfigManagerVersion() {
+        return configManagerVersion;
+    }
+
+    public void setConfigManagerVersion(String configManagerVersion) {
+        this.configManagerVersion = configManagerVersion;
+    }
 }
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBOffsetSerializeWrapper.java
 
b/common/src/main/java/org/apache/rocketmq/common/config/ConfigManagerVersion.java
similarity index 59%
rename from 
broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBOffsetSerializeWrapper.java
rename to 
common/src/main/java/org/apache/rocketmq/common/config/ConfigManagerVersion.java
index 7a90fd62fb..0d5dd6940a 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBOffsetSerializeWrapper.java
+++ 
b/common/src/main/java/org/apache/rocketmq/common/config/ConfigManagerVersion.java
@@ -14,21 +14,20 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.broker.offset;
 
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
+package org.apache.rocketmq.common.config;
 
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+public enum ConfigManagerVersion {
+    V1("v1"),
+    V2("v2"),
+    ;
+    private final String version;
 
-public class RocksDBOffsetSerializeWrapper extends RemotingSerializable {
-    private ConcurrentMap<Integer, Long> offsetTable = new 
ConcurrentHashMap(16);
-
-    public ConcurrentMap<Integer, Long> getOffsetTable() {
-        return offsetTable;
+    ConfigManagerVersion(String version) {
+        this.version = version;
     }
 
-    public void setOffsetTable(ConcurrentMap<Integer, Long> offsetTable) {
-        this.offsetTable = offsetTable;
+    public String getVersion() {
+        return version;
     }
 }

Reply via email to