This is an automated email from the ASF dual-hosted git repository. lizhanhui pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new c8938e0c6c [ISSUE #8829] feat: provide ConfigManagerV2 to make best uses of RocksDB (#8856) c8938e0c6c is described below commit c8938e0c6c263cdb73b593ee9984841115604679 Author: Zhanhui Li <lizhan...@apache.org> AuthorDate: Mon Oct 28 10:00:33 2024 +0800 [ISSUE #8829] feat: provide ConfigManagerV2 to make best uses of RocksDB (#8856) * feat: provide ConfigManagerV2 to make best uses of RocksDB Signed-off-by: Li Zhanhui <lizhan...@gmail.com> * fix: release RocksDB objects using try-with-resource Signed-off-by: Li Zhanhui <lizhan...@gmail.com> --------- Signed-off-by: Li Zhanhui <lizhan...@gmail.com> --- .../apache/rocketmq/broker/BrokerController.java | 39 +- .../v1}/RocksDBConsumerOffsetManager.java | 3 +- .../v1}/RocksDBLmqConsumerOffsetManager.java | 2 +- .../v1}/RocksDBLmqSubscriptionGroupManager.java | 2 +- .../v1}/RocksDBLmqTopicConfigManager.java | 2 +- .../v1}/RocksDBOffsetSerializeWrapper.java | 2 +- .../v1}/RocksDBSubscriptionGroupManager.java | 3 +- .../v1}/RocksDBTopicConfigManager.java | 3 +- .../rocketmq/broker/config/v2/ConfigHelper.java | 132 +++++++ .../rocketmq/broker/config/v2/ConfigStorage.java | 122 ++++++ .../broker/config/v2/ConsumerOffsetManagerV2.java | 426 +++++++++++++++++++++ .../v2/RecordPrefix.java} | 21 +- .../v2/SerializationType.java} | 32 +- .../config/v2/SubscriptionGroupManagerV2.java | 171 +++++++++ .../v2/TableId.java} | 26 +- .../v2/TablePrefix.java} | 20 +- .../broker/config/v2/TopicConfigManagerV2.java | 191 +++++++++ .../v2/package-info.java} | 26 +- .../broker/offset/ConsumerOffsetManager.java | 2 +- .../subscription/SubscriptionGroupManager.java | 4 +- .../rocketmq/broker/topic/TopicConfigManager.java | 4 +- .../offset/RocksDBConsumerOffsetManagerTest.java | 1 + .../RocksDBLmqConsumerOffsetManagerTest.java | 1 + .../offset/RocksDBOffsetSerializeWrapperTest.java | 1 + .../offset/RocksdbTransferOffsetAndCqTest.java | 1 + .../broker/processor/AdminBrokerProcessorTest.java | 4 +- .../RocksdbGroupConfigTransferTest.java | 1 + .../topic/RocksdbTopicConfigManagerTest.java | 1 + .../topic/RocksdbTopicConfigTransferTest.java | 1 + .../org/apache/rocketmq/common/BrokerConfig.java | 14 + .../common/config/ConfigManagerVersion.java | 21 +- 31 files changed, 1186 insertions(+), 93 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index 05a00a5005..ee211e1b80 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -76,8 +76,8 @@ import org.apache.rocketmq.broker.offset.BroadcastOffsetManager; import org.apache.rocketmq.broker.offset.ConsumerOffsetManager; import org.apache.rocketmq.broker.offset.ConsumerOrderInfoManager; import org.apache.rocketmq.broker.offset.LmqConsumerOffsetManager; -import org.apache.rocketmq.broker.offset.RocksDBConsumerOffsetManager; -import org.apache.rocketmq.broker.offset.RocksDBLmqConsumerOffsetManager; +import org.apache.rocketmq.broker.config.v1.RocksDBConsumerOffsetManager; +import org.apache.rocketmq.broker.config.v1.RocksDBLmqConsumerOffsetManager; import org.apache.rocketmq.broker.out.BrokerOuterAPI; import org.apache.rocketmq.broker.plugin.BrokerAttachedPlugin; import org.apache.rocketmq.broker.processor.AckMessageProcessor; @@ -99,12 +99,16 @@ import org.apache.rocketmq.broker.processor.SendMessageProcessor; import org.apache.rocketmq.broker.schedule.ScheduleMessageService; import org.apache.rocketmq.broker.slave.SlaveSynchronize; import org.apache.rocketmq.broker.subscription.LmqSubscriptionGroupManager; -import org.apache.rocketmq.broker.subscription.RocksDBLmqSubscriptionGroupManager; -import org.apache.rocketmq.broker.subscription.RocksDBSubscriptionGroupManager; +import org.apache.rocketmq.broker.config.v1.RocksDBLmqSubscriptionGroupManager; +import org.apache.rocketmq.broker.config.v1.RocksDBSubscriptionGroupManager; import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager; +import org.apache.rocketmq.broker.config.v2.ConsumerOffsetManagerV2; +import org.apache.rocketmq.broker.config.v2.SubscriptionGroupManagerV2; +import org.apache.rocketmq.broker.config.v2.TopicConfigManagerV2; +import org.apache.rocketmq.broker.config.v2.ConfigStorage; import org.apache.rocketmq.broker.topic.LmqTopicConfigManager; -import org.apache.rocketmq.broker.topic.RocksDBLmqTopicConfigManager; -import org.apache.rocketmq.broker.topic.RocksDBTopicConfigManager; +import org.apache.rocketmq.broker.config.v1.RocksDBLmqTopicConfigManager; +import org.apache.rocketmq.broker.config.v1.RocksDBTopicConfigManager; import org.apache.rocketmq.broker.topic.TopicConfigManager; import org.apache.rocketmq.broker.topic.TopicQueueMappingCleanService; import org.apache.rocketmq.broker.topic.TopicQueueMappingManager; @@ -124,6 +128,7 @@ import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.config.ConfigManagerVersion; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.PermName; import org.apache.rocketmq.common.message.MessageExt; @@ -239,6 +244,11 @@ public class BrokerController { protected RemotingServer remotingServer; protected CountDownLatch remotingServerStartLatch; protected RemotingServer fastRemotingServer; + + /** + * If {Topic, SubscriptionGroup, Offset}ManagerV2 are used, config entries are stored in RocksDB. + */ + protected ConfigStorage configStorage; protected TopicConfigManager topicConfigManager; protected SubscriptionGroupManager subscriptionGroupManager; protected TopicQueueMappingManager topicQueueMappingManager; @@ -334,7 +344,12 @@ public class BrokerController { this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), getListenPort())); this.brokerStatsManager = messageStoreConfig.isEnableLmq() ? new LmqBrokerStatsManager(this.brokerConfig.getBrokerClusterName(), this.brokerConfig.isEnableDetailStat()) : new BrokerStatsManager(this.brokerConfig.getBrokerClusterName(), this.brokerConfig.isEnableDetailStat()); this.broadcastOffsetManager = new BroadcastOffsetManager(this); - if (this.messageStoreConfig.isEnableRocksDBStore()) { + if (ConfigManagerVersion.V2.getVersion().equals(brokerConfig.getConfigManagerVersion())) { + this.configStorage = new ConfigStorage(messageStoreConfig.getStorePathRootDir()); + this.topicConfigManager = new TopicConfigManagerV2(this, configStorage); + this.subscriptionGroupManager = new SubscriptionGroupManagerV2(this, configStorage); + this.consumerOffsetManager = new ConsumerOffsetManagerV2(this, configStorage); + } else if (this.messageStoreConfig.isEnableRocksDBStore()) { this.topicConfigManager = messageStoreConfig.isEnableLmq() ? new RocksDBLmqTopicConfigManager(this) : new RocksDBTopicConfigManager(this); this.subscriptionGroupManager = messageStoreConfig.isEnableLmq() ? new RocksDBLmqSubscriptionGroupManager(this) : new RocksDBSubscriptionGroupManager(this); this.consumerOffsetManager = messageStoreConfig.isEnableLmq() ? new RocksDBLmqConsumerOffsetManager(this) : new RocksDBConsumerOffsetManager(this); @@ -771,7 +786,11 @@ public class BrokerController { } public boolean initializeMetadata() { - boolean result = this.topicConfigManager.load(); + boolean result = true; + if (null != configStorage) { + result = configStorage.start(); + } + result = result && this.topicConfigManager.load(); result = result && this.topicQueueMappingManager.load(); result = result && this.consumerOffsetManager.load(); result = result && this.subscriptionGroupManager.load(); @@ -1547,6 +1566,10 @@ public class BrokerController { this.consumerOffsetManager.stop(); } + if (null != configStorage) { + configStorage.shutdown(); + } + if (this.authenticationMetadataManager != null) { this.authenticationMetadataManager.shutdown(); } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBConsumerOffsetManager.java b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBConsumerOffsetManager.java similarity index 98% rename from broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBConsumerOffsetManager.java rename to broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBConsumerOffsetManager.java index 1e7cda71ee..8066fe769a 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBConsumerOffsetManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBConsumerOffsetManager.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.broker.offset; +package org.apache.rocketmq.broker.config.v1; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.serializer.SerializerFeature; @@ -24,6 +24,7 @@ import java.util.Map.Entry; import java.util.concurrent.ConcurrentMap; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.RocksDBConfigManager; +import org.apache.rocketmq.broker.offset.ConsumerOffsetManager; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.utils.DataConverter; diff --git a/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBLmqConsumerOffsetManager.java b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBLmqConsumerOffsetManager.java similarity index 98% rename from broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBLmqConsumerOffsetManager.java rename to broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBLmqConsumerOffsetManager.java index d0faa66140..e961c6c635 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBLmqConsumerOffsetManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBLmqConsumerOffsetManager.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.broker.offset; +package org.apache.rocketmq.broker.config.v1; import java.util.HashMap; import java.util.Map; diff --git a/broker/src/main/java/org/apache/rocketmq/broker/subscription/RocksDBLmqSubscriptionGroupManager.java b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBLmqSubscriptionGroupManager.java similarity index 97% rename from broker/src/main/java/org/apache/rocketmq/broker/subscription/RocksDBLmqSubscriptionGroupManager.java rename to broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBLmqSubscriptionGroupManager.java index e4de25756b..05f3f7d2ec 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/subscription/RocksDBLmqSubscriptionGroupManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBLmqSubscriptionGroupManager.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.broker.subscription; +package org.apache.rocketmq.broker.config.v1; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.common.MixAll; diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/RocksDBLmqTopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBLmqTopicConfigManager.java similarity index 97% rename from broker/src/main/java/org/apache/rocketmq/broker/topic/RocksDBLmqTopicConfigManager.java rename to broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBLmqTopicConfigManager.java index d049a8dbcd..7b27801396 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/topic/RocksDBLmqTopicConfigManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBLmqTopicConfigManager.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.broker.topic; +package org.apache.rocketmq.broker.config.v1; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.common.MixAll; diff --git a/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBOffsetSerializeWrapper.java b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBOffsetSerializeWrapper.java similarity index 96% copy from broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBOffsetSerializeWrapper.java copy to broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBOffsetSerializeWrapper.java index 7a90fd62fb..4801cfc681 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBOffsetSerializeWrapper.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBOffsetSerializeWrapper.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.broker.offset; +package org.apache.rocketmq.broker.config.v1; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; diff --git a/broker/src/main/java/org/apache/rocketmq/broker/subscription/RocksDBSubscriptionGroupManager.java b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBSubscriptionGroupManager.java similarity index 98% rename from broker/src/main/java/org/apache/rocketmq/broker/subscription/RocksDBSubscriptionGroupManager.java rename to broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBSubscriptionGroupManager.java index 5119f78672..8175d63cce 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/subscription/RocksDBSubscriptionGroupManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBSubscriptionGroupManager.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.broker.subscription; +package org.apache.rocketmq.broker.config.v1; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; @@ -27,6 +27,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.function.BiConsumer; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.RocksDBConfigManager; +import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.utils.DataConverter; import org.apache.rocketmq.remoting.protocol.DataVersion; diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/RocksDBTopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBTopicConfigManager.java similarity index 98% rename from broker/src/main/java/org/apache/rocketmq/broker/topic/RocksDBTopicConfigManager.java rename to broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBTopicConfigManager.java index 466e6416f9..bce67392f6 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/topic/RocksDBTopicConfigManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBTopicConfigManager.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.broker.topic; +package org.apache.rocketmq.broker.config.v1; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.serializer.SerializerFeature; @@ -23,6 +23,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentMap; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.RocksDBConfigManager; +import org.apache.rocketmq.broker.topic.TopicConfigManager; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.utils.DataConverter; diff --git a/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConfigHelper.java b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConfigHelper.java new file mode 100644 index 0000000000..8183a1f835 --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConfigHelper.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.config.v2; + +import com.alibaba.fastjson2.JSON; +import com.google.common.base.Preconditions; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import java.nio.charset.StandardCharsets; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.rocketmq.common.config.AbstractRocksDBStorage; +import org.apache.rocketmq.remoting.protocol.DataVersion; +import org.rocksdb.RocksDBException; +import org.rocksdb.WriteBatch; + +public class ConfigHelper { + + /** + * <p> + * Layout of data version key: + * [table-prefix, 1 byte][table-id, 2 byte][record-prefix, 1 byte][data-version-bytes] + * </p> + * + * <p> + * Layout of data version value: + * [state-machine-version, 8 bytes][timestamp, 8 bytes][sequence counter, 8 bytes] + * </p> + * + * @throws RocksDBException if RocksDB raises an error + */ + public static Optional<ByteBuf> loadDataVersion(ConfigStorage configStorage, TableId tableId) + throws RocksDBException { + int keyLen = 1 /* table-prefix */ + Short.BYTES /* table-id */ + 1 /* record-prefix */ + + ConfigStorage.DATA_VERSION_KEY_BYTES.length; + ByteBuf keyBuf = AbstractRocksDBStorage.POOLED_ALLOCATOR.buffer(keyLen); + try { + keyBuf.writeByte(TablePrefix.TABLE.getValue()); + keyBuf.writeShort(tableId.getValue()); + keyBuf.writeByte(RecordPrefix.DATA_VERSION.getValue()); + keyBuf.writeBytes(ConfigStorage.DATA_VERSION_KEY_BYTES); + byte[] valueByes = configStorage.get(keyBuf.nioBuffer()); + if (null != valueByes) { + ByteBuf valueBuf = Unpooled.wrappedBuffer(valueByes); + return Optional.of(valueBuf); + } + } finally { + keyBuf.release(); + } + return Optional.empty(); + } + + public static void stampDataVersion(WriteBatch writeBatch, DataVersion dataVersion, long stateMachineVersion) + throws RocksDBException { + // Increase data version + dataVersion.nextVersion(stateMachineVersion); + + int keyLen = 1 /* table-prefix */ + Short.BYTES /* table-id */ + 1 /* record-prefix */ + + ConfigStorage.DATA_VERSION_KEY_BYTES.length; + ByteBuf keyBuf = AbstractRocksDBStorage.POOLED_ALLOCATOR.buffer(keyLen); + ByteBuf valueBuf = AbstractRocksDBStorage.POOLED_ALLOCATOR.buffer(Long.BYTES * 3); + try { + keyBuf.writeByte(TablePrefix.TABLE.getValue()); + keyBuf.writeShort(TableId.CONSUMER_OFFSET.getValue()); + keyBuf.writeByte(RecordPrefix.DATA_VERSION.getValue()); + keyBuf.writeBytes(ConfigStorage.DATA_VERSION_KEY_BYTES); + valueBuf.writeLong(dataVersion.getStateVersion()); + valueBuf.writeLong(dataVersion.getTimestamp()); + valueBuf.writeLong(dataVersion.getCounter().get()); + writeBatch.put(keyBuf.nioBuffer(), valueBuf.nioBuffer()); + } finally { + keyBuf.release(); + valueBuf.release(); + } + } + + public static void onDataVersionLoad(ByteBuf buf, DataVersion dataVersion) { + if (buf.readableBytes() == 8 /* state machine version */ + 8 /* timestamp */ + 8 /* counter */) { + long stateMachineVersion = buf.readLong(); + long timestamp = buf.readLong(); + long counter = buf.readLong(); + dataVersion.setStateVersion(stateMachineVersion); + dataVersion.setTimestamp(timestamp); + dataVersion.setCounter(new AtomicLong(counter)); + } + buf.release(); + } + + public static ByteBuf keyBufOf(TableId tableId, final String name) { + Preconditions.checkNotNull(name); + byte[] bytes = name.getBytes(StandardCharsets.UTF_8); + int keyLen = 1 /* table-prefix */ + 2 /* table-id */ + 1 /* record-type-prefix */ + 2 /* name-length */ + bytes.length; + ByteBuf keyBuf = AbstractRocksDBStorage.POOLED_ALLOCATOR.buffer(keyLen); + keyBuf.writeByte(TablePrefix.TABLE.getValue()); + keyBuf.writeShort(tableId.getValue()); + keyBuf.writeByte(RecordPrefix.DATA.getValue()); + keyBuf.writeShort(bytes.length); + keyBuf.writeBytes(bytes); + return keyBuf; + } + + public static ByteBuf valueBufOf(final Object config, SerializationType serializationType) { + if (SerializationType.JSON == serializationType) { + byte[] payload = JSON.toJSONBytes(config); + ByteBuf valueBuf = AbstractRocksDBStorage.POOLED_ALLOCATOR.buffer(1 + payload.length); + valueBuf.writeByte(SerializationType.JSON.getValue()); + valueBuf.writeBytes(payload); + return valueBuf; + } + throw new RuntimeException("Unsupported serialization type: " + serializationType); + } + + public static byte[] readBytes(final ByteBuf buf) { + byte[] bytes = new byte[buf.readableBytes()]; + buf.readBytes(bytes); + return bytes; + } +} diff --git a/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConfigStorage.java b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConfigStorage.java new file mode 100644 index 0000000000..af259aaa37 --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConfigStorage.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.config.v2; + +import io.netty.util.internal.PlatformDependent; +import java.io.File; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.config.AbstractRocksDBStorage; +import org.apache.rocketmq.common.config.ConfigHelper; +import org.rocksdb.ColumnFamilyDescriptor; +import org.rocksdb.ColumnFamilyOptions; +import org.rocksdb.DirectSlice; +import org.rocksdb.ReadOptions; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.RocksIterator; +import org.rocksdb.WriteBatch; +import org.rocksdb.WriteOptions; + +/** + * https://book.tidb.io/session1/chapter3/tidb-kv-to-relation.html + */ +public class ConfigStorage extends AbstractRocksDBStorage { + + public static final String DATA_VERSION_KEY = "data_version"; + public static final byte[] DATA_VERSION_KEY_BYTES = DATA_VERSION_KEY.getBytes(StandardCharsets.UTF_8); + + public ConfigStorage(String storePath) { + super(storePath + File.separator + "config" + File.separator + "rdb"); + } + + @Override + protected boolean postLoad() { + if (!PlatformDependent.hasUnsafe()) { + LOGGER.error("Unsafe not available and POOLED_ALLOCATOR cannot work correctly"); + return false; + } + try { + UtilAll.ensureDirOK(this.dbPath); + initOptions(); + List<ColumnFamilyDescriptor> cfDescriptors = new ArrayList<>(); + + ColumnFamilyOptions defaultOptions = ConfigHelper.createConfigOptions(); + this.cfOptions.add(defaultOptions); + cfDescriptors.add(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, defaultOptions)); + + // Start RocksDB instance + open(cfDescriptors); + + this.defaultCFHandle = cfHandles.get(0); + } catch (final Exception e) { + AbstractRocksDBStorage.LOGGER.error("postLoad Failed. {}", this.dbPath, e); + return false; + } + return true; + } + + @Override + protected void preShutdown() { + + } + + protected void initOptions() { + this.options = ConfigHelper.createConfigDBOptions(); + super.initOptions(); + } + + @Override + protected void initAbleWalWriteOptions() { + this.ableWalWriteOptions = new WriteOptions(); + + // For metadata, prioritize data integrity + this.ableWalWriteOptions.setSync(true); + + // We need WAL for config changes + this.ableWalWriteOptions.setDisableWAL(false); + + // No fast failure on block, wait synchronously even if there is wait for the write request + this.ableWalWriteOptions.setNoSlowdown(false); + } + + public byte[] get(ByteBuffer key) throws RocksDBException { + byte[] keyBytes = new byte[key.remaining()]; + key.get(keyBytes); + return super.get(getDefaultCFHandle(), totalOrderReadOptions, keyBytes); + } + + public void write(WriteBatch writeBatch) throws RocksDBException { + db.write(ableWalWriteOptions, writeBatch); + } + + public RocksIterator iterate(ByteBuffer beginKey, ByteBuffer endKey) { + try (ReadOptions readOptions = new ReadOptions()) { + readOptions.setTotalOrderSeek(true); + readOptions.setTailing(false); + readOptions.setAutoPrefixMode(true); + readOptions.setIterateLowerBound(new DirectSlice(beginKey)); + readOptions.setIterateUpperBound(new DirectSlice(endKey)); + RocksIterator iterator = db.newIterator(defaultCFHandle, readOptions); + iterator.seekToFirst(); + return iterator; + } + } +} diff --git a/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2.java b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2.java new file mode 100644 index 0000000000..5b0885c491 --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2.java @@ -0,0 +1,426 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.config.v2; + +import io.netty.buffer.ByteBuf; +import io.netty.util.internal.PlatformDependent; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.broker.offset.ConsumerOffsetManager; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.config.AbstractRocksDBStorage; +import org.apache.rocketmq.store.MessageStore; +import org.rocksdb.RocksDBException; +import org.rocksdb.RocksIterator; +import org.rocksdb.WriteBatch; + +/** + * <p> + * Layout of consumer offset key: + * [table-prefix, 1 byte][table-id, 2 bytes][record-prefix, 1 byte][group-len, 2 bytes][group bytes][CTRL_1, 1 byte] + * [topic-len, 2 bytes][topic bytes][CTRL_1, 1 byte][queue-id, 4 bytes] + * </p> + * + * <p> + * Layout of consumer offset value: [offset, 8 bytes] + * </p> + */ +public class ConsumerOffsetManagerV2 extends ConsumerOffsetManager { + + private final ConfigStorage configStorage; + + public ConsumerOffsetManagerV2(BrokerController brokerController, ConfigStorage configStorage) { + super(brokerController); + this.configStorage = configStorage; + } + + @Override + protected void removeConsumerOffset(String topicAtGroup) { + if (!MixAll.isLmq(topicAtGroup)) { + super.removeConsumerOffset(topicAtGroup); + } + + String[] topicGroup = topicAtGroup.split(TOPIC_GROUP_SEPARATOR); + if (topicGroup.length != 2) { + LOG.error("Invalid topic group: {}", topicAtGroup); + return; + } + + byte[] topicBytes = topicGroup[0].getBytes(StandardCharsets.UTF_8); + byte[] groupBytes = topicGroup[1].getBytes(StandardCharsets.UTF_8); + + int keyLen = 1 /* table-prefix */ + Short.BYTES /* table-id */ + 1 /* record-prefix */ + + Short.BYTES /* group-len */ + groupBytes.length + 1 /* CTRL_1 */ + + Short.BYTES + topicBytes.length + 1; + // [table-prefix, 1 byte][table-id, 2 bytes][record-prefix, 1 byte][group-len, 2 bytes][group-bytes][CTRL_1, 1 byte] + // [topic-len, 2 bytes][topic-bytes][CTRL_1] + ByteBuf beginKey = AbstractRocksDBStorage.POOLED_ALLOCATOR.buffer(keyLen); + beginKey.writeByte(TablePrefix.TABLE.getValue()); + beginKey.writeShort(TableId.CONSUMER_OFFSET.getValue()); + beginKey.writeByte(RecordPrefix.DATA.getValue()); + beginKey.writeShort(groupBytes.length); + beginKey.writeBytes(groupBytes); + beginKey.writeByte(AbstractRocksDBStorage.CTRL_1); + beginKey.writeShort(topicBytes.length); + beginKey.writeBytes(topicBytes); + beginKey.writeByte(AbstractRocksDBStorage.CTRL_1); + + ByteBuf endKey = AbstractRocksDBStorage.POOLED_ALLOCATOR.buffer(keyLen); + endKey.writeByte(TablePrefix.TABLE.getValue()); + endKey.writeShort(TableId.CONSUMER_OFFSET.getValue()); + endKey.writeByte(RecordPrefix.DATA.getValue()); + endKey.writeShort(groupBytes.length); + endKey.writeBytes(groupBytes); + endKey.writeByte(AbstractRocksDBStorage.CTRL_1); + endKey.writeShort(topicBytes.length); + endKey.writeBytes(topicBytes); + endKey.writeByte(AbstractRocksDBStorage.CTRL_2); + + try (WriteBatch writeBatch = new WriteBatch()) { + // TODO: we have to make a copy here as WriteBatch lacks ByteBuffer API here + writeBatch.deleteRange(ConfigHelper.readBytes(beginKey), ConfigHelper.readBytes(endKey)); + long stateMachineVersion = brokerController.getMessageStore() != null ? brokerController.getMessageStore().getStateMachineVersion() : 0; + ConfigHelper.stampDataVersion(writeBatch, dataVersion, stateMachineVersion); + configStorage.write(writeBatch); + } catch (RocksDBException e) { + LOG.error("Failed to removeConsumerOffset, topicAtGroup={}", topicAtGroup, e); + } finally { + beginKey.release(); + endKey.release(); + } + } + + @Override + public void removeOffset(String group) { + if (!MixAll.isLmq(group)) { + super.removeOffset(group); + } + + byte[] groupBytes = group.getBytes(StandardCharsets.UTF_8); + int keyLen = 1 /* table-prefix */ + Short.BYTES /* table-id */ + 1 /* record-prefix */ + + Short.BYTES /* group-len */ + groupBytes.length + 1 /* CTRL_1 */; + + // [table-prefix, 1 byte][table-id, 2 bytes][record-prefix, 1 byte][group-len, 2 bytes][group bytes][CTRL_1, 1 byte] + ByteBuf beginKey = AbstractRocksDBStorage.POOLED_ALLOCATOR.buffer(keyLen); + beginKey.writeByte(TablePrefix.TABLE.getValue()); + beginKey.writeShort(TableId.CONSUMER_OFFSET.getValue()); + beginKey.writeByte(RecordPrefix.DATA.getValue()); + beginKey.writeShort(groupBytes.length); + beginKey.writeBytes(groupBytes); + beginKey.writeByte(AbstractRocksDBStorage.CTRL_1); + + ByteBuf endKey = AbstractRocksDBStorage.POOLED_ALLOCATOR.buffer(keyLen); + endKey.writeByte(TablePrefix.TABLE.getValue()); + endKey.writeShort(TableId.CONSUMER_OFFSET.getValue()); + endKey.writeByte(RecordPrefix.DATA.getValue()); + endKey.writeShort(groupBytes.length); + endKey.writeBytes(groupBytes); + endKey.writeByte(AbstractRocksDBStorage.CTRL_2); + try (WriteBatch writeBatch = new WriteBatch()) { + // TODO: we have to make a copy here as WriteBatch lacks ByteBuffer API here + writeBatch.deleteRange(ConfigHelper.readBytes(beginKey), ConfigHelper.readBytes(endKey)); + MessageStore messageStore = brokerController.getMessageStore(); + long stateMachineVersion = messageStore != null ? messageStore.getStateMachineVersion() : 0; + ConfigHelper.stampDataVersion(writeBatch, dataVersion, stateMachineVersion); + configStorage.write(writeBatch); + } catch (RocksDBException e) { + LOG.error("Failed to consumer offsets by group={}", group, e); + } finally { + beginKey.release(); + endKey.release(); + } + } + + /** + * <p> + * Layout of consumer offset key: + * [table-prefix, 1 byte][table-id, 2 bytes][record-prefix, 1 byte][group-len, 2 bytes][group bytes][CTRL_1, 1 byte] + * [topic-len, 2 bytes][topic bytes][CTRL_1, 1 byte][queue-id, 4 bytes] + * </p> + * + * <p> + * Layout of consumer offset value: + * [offset, 8 bytes] + * </p> + * + * @param clientHost The client that submits consumer offsets + * @param group Group name + * @param topic Topic name + * @param queueId Queue ID + * @param offset Consumer offset of the specified queue + */ + @Override + public void commitOffset(String clientHost, String group, String topic, int queueId, long offset) { + String key = topic + TOPIC_GROUP_SEPARATOR + group; + + // We maintain a copy of classic consumer offset table in memory as they take very limited memory footprint. + // For LMQ offsets, given the volume and number of these type of records, they are maintained in RocksDB + // directly. Frequently used LMQ consumer offsets should reside either in block-cache or MemTable, so read/write + // should be blazingly fast. + if (!MixAll.isLmq(topic)) { + if (offsetTable.containsKey(key)) { + offsetTable.get(key).put(queueId, offset); + } else { + ConcurrentMap<Integer, Long> map = new ConcurrentHashMap<>(); + ConcurrentMap<Integer, Long> prev = offsetTable.putIfAbsent(key, map); + if (null != prev) { + map = prev; + } + map.put(queueId, offset); + } + } + + ByteBuf keyBuf = keyOfConsumerOffset(group, topic, queueId); + ByteBuf valueBuf = ConfigStorage.POOLED_ALLOCATOR.buffer(Long.BYTES); + try (WriteBatch writeBatch = new WriteBatch()) { + valueBuf.writeLong(offset); + writeBatch.put(keyBuf.nioBuffer(), valueBuf.nioBuffer()); + MessageStore messageStore = brokerController.getMessageStore(); + long stateMachineVersion = messageStore != null ? messageStore.getStateMachineVersion() : 0; + ConfigHelper.stampDataVersion(writeBatch, dataVersion, stateMachineVersion); + configStorage.write(writeBatch); + } catch (RocksDBException e) { + LOG.error("Failed to commit consumer offset", e); + } finally { + keyBuf.release(); + valueBuf.release(); + } + } + + private ByteBuf keyOfConsumerOffset(String group, String topic, int queueId) { + byte[] groupBytes = group.getBytes(StandardCharsets.UTF_8); + byte[] topicBytes = topic.getBytes(StandardCharsets.UTF_8); + int keyLen = 1 /*table prefix*/ + Short.BYTES /*table-id*/ + 1 /*record-prefix*/ + + Short.BYTES /*group-len*/ + groupBytes.length + 1 /*CTRL_1*/ + + 2 /*topic-len*/ + topicBytes.length + 1 /* CTRL_1*/ + + Integer.BYTES /*queue-id*/; + ByteBuf keyBuf = ConfigStorage.POOLED_ALLOCATOR.buffer(keyLen); + keyBuf.writeByte(TablePrefix.TABLE.getValue()); + keyBuf.writeShort(TableId.CONSUMER_OFFSET.getValue()); + keyBuf.writeByte(RecordPrefix.DATA.getValue()); + keyBuf.writeShort(groupBytes.length); + keyBuf.writeBytes(groupBytes); + keyBuf.writeByte(AbstractRocksDBStorage.CTRL_1); + keyBuf.writeShort(topicBytes.length); + keyBuf.writeBytes(topicBytes); + keyBuf.writeByte(AbstractRocksDBStorage.CTRL_1); + keyBuf.writeInt(queueId); + return keyBuf; + } + + private ByteBuf keyOfPullOffset(String group, String topic, int queueId) { + byte[] groupBytes = group.getBytes(StandardCharsets.UTF_8); + byte[] topicBytes = topic.getBytes(StandardCharsets.UTF_8); + int keyLen = 1 /*table prefix*/ + Short.BYTES /*table-id*/ + 1 /*record-prefix*/ + + Short.BYTES /*group-len*/ + groupBytes.length + 1 /*CTRL_1*/ + + 2 /*topic-len*/ + topicBytes.length + 1 /* CTRL_1*/ + + Integer.BYTES /*queue-id*/; + ByteBuf keyBuf = ConfigStorage.POOLED_ALLOCATOR.buffer(keyLen); + keyBuf.writeByte(TablePrefix.TABLE.getValue()); + keyBuf.writeShort(TableId.PULL_OFFSET.getValue()); + keyBuf.writeByte(RecordPrefix.DATA.getValue()); + keyBuf.writeShort(groupBytes.length); + keyBuf.writeBytes(groupBytes); + keyBuf.writeByte(AbstractRocksDBStorage.CTRL_1); + keyBuf.writeShort(topicBytes.length); + keyBuf.writeBytes(topicBytes); + keyBuf.writeByte(AbstractRocksDBStorage.CTRL_1); + keyBuf.writeInt(queueId); + return keyBuf; + } + + @Override + public boolean load() { + return loadDataVersion() && loadConsumerOffsets(); + } + + @Override + public synchronized void persist() { + try { + configStorage.flushWAL(); + } catch (RocksDBException e) { + LOG.error("Failed to flush RocksDB config instance WAL", e); + } + } + + /** + * <p> + * Layout of data version key: + * [table-prefix, 1 byte][table-id, 2 byte][record-prefix, 1 byte][data-version-bytes] + * </p> + * + * <p> + * Layout of data version value: + * [state-machine-version, 8 bytes][timestamp, 8 bytes][sequence counter, 8 bytes] + * </p> + */ + public boolean loadDataVersion() { + try { + ConfigHelper.loadDataVersion(configStorage, TableId.CONSUMER_OFFSET) + .ifPresent(buf -> ConfigHelper.onDataVersionLoad(buf, dataVersion)); + } catch (RocksDBException e) { + LOG.error("Failed to load RocksDB config", e); + return false; + } + return true; + } + + private boolean loadConsumerOffsets() { + // [table-prefix, 1 byte][table-id, 2 bytes][record-prefix, 1 byte] + ByteBuf beginKeyBuf = AbstractRocksDBStorage.POOLED_ALLOCATOR.buffer(4); + beginKeyBuf.writeByte(TablePrefix.TABLE.getValue()); + beginKeyBuf.writeShort(TableId.CONSUMER_OFFSET.getValue()); + beginKeyBuf.writeByte(RecordPrefix.DATA.getValue()); + + ByteBuf endKeyBuf = AbstractRocksDBStorage.POOLED_ALLOCATOR.buffer(4); + endKeyBuf.writeByte(TablePrefix.TABLE.getValue()); + endKeyBuf.writeShort(TableId.CONSUMER_OFFSET.getValue()); + endKeyBuf.writeByte(RecordPrefix.DATA.getValue() + 1); + + try (RocksIterator iterator = configStorage.iterate(beginKeyBuf.nioBuffer(), endKeyBuf.nioBuffer())) { + int keyCapacity = 256; + // We may iterate millions of LMQ consumer offsets here, use direct byte buffers here to avoid memory + // fragment + ByteBuffer keyBuffer = ByteBuffer.allocateDirect(keyCapacity); + ByteBuffer valueBuffer = ByteBuffer.allocateDirect(Long.BYTES); + while (iterator.isValid()) { + keyBuffer.clear(); + valueBuffer.clear(); + + int len = iterator.key(keyBuffer); + if (len > keyCapacity) { + keyCapacity = len; + PlatformDependent.freeDirectBuffer(keyBuffer); + // Reserve more space for key + keyBuffer = ByteBuffer.allocateDirect(keyCapacity); + continue; + } + len = iterator.value(valueBuffer); + assert len == Long.BYTES; + + // skip table-prefix, table-id, record-prefix + keyBuffer.position(1 + 2 + 1); + short groupLen = keyBuffer.getShort(); + byte[] groupBytes = new byte[groupLen]; + keyBuffer.get(groupBytes); + byte ctrl = keyBuffer.get(); + assert ctrl == AbstractRocksDBStorage.CTRL_1; + + short topicLen = keyBuffer.getShort(); + byte[] topicBytes = new byte[topicLen]; + keyBuffer.get(topicBytes); + String topic = new String(topicBytes, StandardCharsets.UTF_8); + ctrl = keyBuffer.get(); + assert ctrl == AbstractRocksDBStorage.CTRL_1; + + int queueId = keyBuffer.getInt(); + + long offset = valueBuffer.getLong(); + + if (!MixAll.isLmq(topic)) { + String group = new String(groupBytes, StandardCharsets.UTF_8); + onConsumerOffsetRecordLoad(topic, group, queueId, offset); + } + iterator.next(); + } + PlatformDependent.freeDirectBuffer(keyBuffer); + PlatformDependent.freeDirectBuffer(valueBuffer); + } finally { + beginKeyBuf.release(); + endKeyBuf.release(); + } + return true; + } + + private void onConsumerOffsetRecordLoad(String topic, String group, int queueId, long offset) { + if (MixAll.isLmq(topic)) { + return; + } + String key = topic + TOPIC_GROUP_SEPARATOR + group; + if (!offsetTable.containsKey(key)) { + ConcurrentMap<Integer, Long> map = new ConcurrentHashMap<>(); + offsetTable.putIfAbsent(key, map); + } + offsetTable.get(key).put(queueId, offset); + } + + @Override + public long queryOffset(String group, String topic, int queueId) { + if (!MixAll.isLmq(topic)) { + return super.queryOffset(group, topic, queueId); + } + + ByteBuf keyBuf = keyOfConsumerOffset(group, topic, queueId); + try { + byte[] slice = configStorage.get(keyBuf.nioBuffer()); + if (null == slice) { + return -1; + } + assert slice.length == Long.BYTES; + return ByteBuffer.wrap(slice).getLong(); + } catch (RocksDBException e) { + throw new RuntimeException(e); + } finally { + keyBuf.release(); + } + } + + @Override + public void commitPullOffset(String clientHost, String group, String topic, int queueId, long offset) { + if (!MixAll.isLmq(topic)) { + super.commitPullOffset(clientHost, group, topic, queueId, offset); + } + + ByteBuf keyBuf = keyOfPullOffset(group, topic, queueId); + ByteBuf valueBuf = AbstractRocksDBStorage.POOLED_ALLOCATOR.buffer(8); + try (WriteBatch writeBatch = new WriteBatch()) { + writeBatch.put(keyBuf.nioBuffer(), valueBuf.nioBuffer()); + long stateMachineVersion = brokerController.getMessageStore() != null ? brokerController.getMessageStore().getStateMachineVersion() : 0; + ConfigHelper.stampDataVersion(writeBatch, dataVersion, stateMachineVersion); + } catch (RocksDBException e) { + LOG.error("Failed to commit pull offset. group={}, topic={}, queueId={}, offset={}", + group, topic, queueId, offset); + } finally { + keyBuf.release(); + valueBuf.release(); + } + } + + @Override + public long queryPullOffset(String group, String topic, int queueId) { + if (!MixAll.isLmq(topic)) { + return super.queryPullOffset(group, topic, queueId); + } + + ByteBuf keyBuf = keyOfPullOffset(group, topic, queueId); + try { + byte[] valueBytes = configStorage.get(keyBuf.nioBuffer()); + if (null == valueBytes) { + return -1; + } + return ByteBuffer.wrap(valueBytes).getLong(); + } catch (RocksDBException e) { + LOG.error("Failed to queryPullOffset. group={}, topic={}, queueId={}", group, topic, queueId); + } finally { + keyBuf.release(); + } + return -1; + } +} diff --git a/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBOffsetSerializeWrapper.java b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/RecordPrefix.java similarity index 59% copy from broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBOffsetSerializeWrapper.java copy to broker/src/main/java/org/apache/rocketmq/broker/config/v2/RecordPrefix.java index 7a90fd62fb..750d454d4e 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBOffsetSerializeWrapper.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/RecordPrefix.java @@ -14,21 +14,20 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.broker.offset; +package org.apache.rocketmq.broker.config.v2; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; +public enum RecordPrefix { + UNSPECIFIED((byte)0), + DATA_VERSION((byte)1), + DATA((byte)2); -import org.apache.rocketmq.remoting.protocol.RemotingSerializable; + private final byte value; -public class RocksDBOffsetSerializeWrapper extends RemotingSerializable { - private ConcurrentMap<Integer, Long> offsetTable = new ConcurrentHashMap(16); - - public ConcurrentMap<Integer, Long> getOffsetTable() { - return offsetTable; + RecordPrefix(byte value) { + this.value = value; } - public void setOffsetTable(ConcurrentMap<Integer, Long> offsetTable) { - this.offsetTable = offsetTable; + public byte getValue() { + return value; } } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBOffsetSerializeWrapper.java b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/SerializationType.java similarity index 57% copy from broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBOffsetSerializeWrapper.java copy to broker/src/main/java/org/apache/rocketmq/broker/config/v2/SerializationType.java index 7a90fd62fb..2ee157fdc8 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBOffsetSerializeWrapper.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/SerializationType.java @@ -14,21 +14,33 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.broker.offset; +package org.apache.rocketmq.broker.config.v2; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; +public enum SerializationType { + UNSPECIFIED((byte) 0), -import org.apache.rocketmq.remoting.protocol.RemotingSerializable; + JSON((byte) 1), -public class RocksDBOffsetSerializeWrapper extends RemotingSerializable { - private ConcurrentMap<Integer, Long> offsetTable = new ConcurrentHashMap(16); + PROTOBUF((byte) 2), - public ConcurrentMap<Integer, Long> getOffsetTable() { - return offsetTable; + FLAT_BUFFERS((byte) 3); + + private final byte value; + + SerializationType(byte value) { + this.value = value; + } + + public byte getValue() { + return value; } - public void setOffsetTable(ConcurrentMap<Integer, Long> offsetTable) { - this.offsetTable = offsetTable; + public static SerializationType valueOf(byte value) { + for (SerializationType type : SerializationType.values()) { + if (type.getValue() == value) { + return type; + } + } + return SerializationType.UNSPECIFIED; } } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/config/v2/SubscriptionGroupManagerV2.java b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/SubscriptionGroupManagerV2.java new file mode 100644 index 0000000000..8da6f9d2bc --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/SubscriptionGroupManagerV2.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.config.v2; + +import com.alibaba.fastjson2.JSON; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import java.nio.charset.StandardCharsets; +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.config.AbstractRocksDBStorage; +import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; +import org.rocksdb.RocksDBException; +import org.rocksdb.RocksIterator; +import org.rocksdb.WriteBatch; + +public class SubscriptionGroupManagerV2 extends SubscriptionGroupManager { + + private final ConfigStorage configStorage; + + public SubscriptionGroupManagerV2(BrokerController brokerController, ConfigStorage configStorage) { + super(brokerController); + this.configStorage = configStorage; + } + + @Override + public boolean load() { + return loadDataVersion() && loadSubscriptions(); + } + + public boolean loadDataVersion() { + try { + ConfigHelper.loadDataVersion(configStorage, TableId.SUBSCRIPTION_GROUP) + .ifPresent(buf -> { + ConfigHelper.onDataVersionLoad(buf, dataVersion); + }); + } catch (RocksDBException e) { + log.error("loadDataVersion error", e); + return false; + } + return true; + } + + private boolean loadSubscriptions() { + int keyLen = 1 /* table prefix */ + 2 /* table-id */ + 1 /* record-type-prefix */; + ByteBuf beginKey = AbstractRocksDBStorage.POOLED_ALLOCATOR.buffer(keyLen); + beginKey.writeByte(TablePrefix.TABLE.getValue()); + beginKey.writeShort(TableId.SUBSCRIPTION_GROUP.getValue()); + beginKey.writeByte(RecordPrefix.DATA.getValue()); + + ByteBuf endKey = AbstractRocksDBStorage.POOLED_ALLOCATOR.buffer(keyLen); + endKey.writeByte(TablePrefix.TABLE.getValue()); + endKey.writeShort(TableId.SUBSCRIPTION_GROUP.getValue()); + endKey.writeByte(RecordPrefix.DATA.getValue() + 1); + + try (RocksIterator iterator = configStorage.iterate(beginKey.nioBuffer(), endKey.nioBuffer())) { + while (iterator.isValid()) { + SubscriptionGroupConfig subscriptionGroupConfig = parseSubscription(iterator.key(), iterator.value()); + if (null != subscriptionGroupConfig) { + super.updateSubscriptionGroupConfigWithoutPersist(subscriptionGroupConfig); + } + } + } finally { + beginKey.release(); + endKey.release(); + } + return true; + } + + private SubscriptionGroupConfig parseSubscription(byte[] key, byte[] value) { + ByteBuf keyBuf = Unpooled.wrappedBuffer(key); + ByteBuf valueBuf = Unpooled.wrappedBuffer(value); + try { + // Skip table-prefix, table-id, record-type-prefix + keyBuf.readerIndex(4); + short groupNameLen = keyBuf.readShort(); + assert groupNameLen == keyBuf.readableBytes(); + CharSequence groupName = keyBuf.readCharSequence(groupNameLen, StandardCharsets.UTF_8); + assert null != groupName; + byte serializationType = valueBuf.readByte(); + if (SerializationType.JSON == SerializationType.valueOf(serializationType)) { + CharSequence json = valueBuf.readCharSequence(valueBuf.readableBytes(), StandardCharsets.UTF_8); + SubscriptionGroupConfig subscriptionGroupConfig = JSON.parseObject(json.toString(), SubscriptionGroupConfig.class); + assert subscriptionGroupConfig != null; + assert groupName.equals(subscriptionGroupConfig.getGroupName()); + return subscriptionGroupConfig; + } + } finally { + keyBuf.release(); + valueBuf.release(); + } + return null; + } + + @Override + public synchronized void persist() { + try { + configStorage.flushWAL(); + } catch (RocksDBException e) { + log.error("Failed to flush RocksDB WAL", e); + } + } + + @Override + public SubscriptionGroupConfig findSubscriptionGroupConfig(final String group) { + if (MixAll.isLmq(group)) { + SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig(); + subscriptionGroupConfig.setGroupName(group); + return subscriptionGroupConfig; + } + return super.findSubscriptionGroupConfig(group); + } + + @Override + public void updateSubscriptionGroupConfig(final SubscriptionGroupConfig config) { + if (config == null || MixAll.isLmq(config.getGroupName())) { + return; + } + ByteBuf keyBuf = ConfigHelper.keyBufOf(TableId.SUBSCRIPTION_GROUP, config.getGroupName()); + ByteBuf valueBuf = ConfigHelper.valueBufOf(config, SerializationType.JSON); + try (WriteBatch writeBatch = new WriteBatch()) { + writeBatch.put(keyBuf.nioBuffer(), valueBuf.nioBuffer()); + long stateMachineVersion = brokerController.getMessageStore() != null ? brokerController.getMessageStore().getStateMachineVersion() : 0; + ConfigHelper.stampDataVersion(writeBatch, dataVersion, stateMachineVersion); + configStorage.write(writeBatch); + } catch (RocksDBException e) { + log.error("update subscription group config error", e); + } finally { + keyBuf.release(); + valueBuf.release(); + } + super.updateSubscriptionGroupConfigWithoutPersist(config); + } + + @Override + public boolean containsSubscriptionGroup(String group) { + if (MixAll.isLmq(group)) { + return true; + } else { + return super.containsSubscriptionGroup(group); + } + } + + @Override + protected SubscriptionGroupConfig removeSubscriptionGroupConfig(String groupName) { + ByteBuf keyBuf = ConfigHelper.keyBufOf(TableId.SUBSCRIPTION_GROUP, groupName); + try (WriteBatch writeBatch = new WriteBatch()) { + writeBatch.delete(ConfigHelper.readBytes(keyBuf)); + long stateMachineVersion = brokerController.getMessageStore().getStateMachineVersion(); + ConfigHelper.stampDataVersion(writeBatch, dataVersion, stateMachineVersion); + } catch (RocksDBException e) { + log.error("Failed to remove subscription group config by group-name={}", groupName, e); + } + return super.removeSubscriptionGroupConfig(groupName); + } +} diff --git a/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBOffsetSerializeWrapper.java b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/TableId.java similarity index 59% copy from broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBOffsetSerializeWrapper.java copy to broker/src/main/java/org/apache/rocketmq/broker/config/v2/TableId.java index 7a90fd62fb..7a61899371 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBOffsetSerializeWrapper.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/TableId.java @@ -14,21 +14,25 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.broker.offset; +package org.apache.rocketmq.broker.config.v2; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -import org.apache.rocketmq.remoting.protocol.RemotingSerializable; +/** + * See <a href="https://book.tidb.io/session1/chapter3/tidb-kv-to-relation.html">Table, Key Value Mapping</a> + */ +public enum TableId { + UNSPECIFIED((short) 0), + CONSUMER_OFFSET((short) 1), + PULL_OFFSET((short) 2), + TOPIC((short) 3), + SUBSCRIPTION_GROUP((short) 4); -public class RocksDBOffsetSerializeWrapper extends RemotingSerializable { - private ConcurrentMap<Integer, Long> offsetTable = new ConcurrentHashMap(16); + private final short value; - public ConcurrentMap<Integer, Long> getOffsetTable() { - return offsetTable; + TableId(short value) { + this.value = value; } - public void setOffsetTable(ConcurrentMap<Integer, Long> offsetTable) { - this.offsetTable = offsetTable; + public short getValue() { + return value; } } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBOffsetSerializeWrapper.java b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/TablePrefix.java similarity index 59% copy from broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBOffsetSerializeWrapper.java copy to broker/src/main/java/org/apache/rocketmq/broker/config/v2/TablePrefix.java index 7a90fd62fb..d16c14d275 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBOffsetSerializeWrapper.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/TablePrefix.java @@ -14,21 +14,19 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.broker.offset; +package org.apache.rocketmq.broker.config.v2; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; +public enum TablePrefix { + UNSPECIFIED((byte) 0), + TABLE((byte) 1); -import org.apache.rocketmq.remoting.protocol.RemotingSerializable; + private final byte value; -public class RocksDBOffsetSerializeWrapper extends RemotingSerializable { - private ConcurrentMap<Integer, Long> offsetTable = new ConcurrentHashMap(16); - - public ConcurrentMap<Integer, Long> getOffsetTable() { - return offsetTable; + TablePrefix(byte value) { + this.value = value; } - public void setOffsetTable(ConcurrentMap<Integer, Long> offsetTable) { - this.offsetTable = offsetTable; + public byte getValue() { + return value; } } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/config/v2/TopicConfigManagerV2.java b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/TopicConfigManagerV2.java new file mode 100644 index 0000000000..b1a3d2d85c --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/TopicConfigManagerV2.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.config.v2; + +import com.alibaba.fastjson2.JSON; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import java.nio.charset.StandardCharsets; +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.broker.topic.TopicConfigManager; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.TopicConfig; +import org.apache.rocketmq.common.config.AbstractRocksDBStorage; +import org.apache.rocketmq.common.constant.PermName; +import org.rocksdb.RocksDBException; +import org.rocksdb.RocksIterator; +import org.rocksdb.WriteBatch; + +/** + * Key layout: [table-prefix, 1 byte][table-id, 2 bytes][record-type-prefix, 1 byte][topic-len, 2 bytes][topic-bytes] + * Value layout: [serialization-type, 1 byte][topic-config-bytes] + */ +public class TopicConfigManagerV2 extends TopicConfigManager { + private final ConfigStorage configStorage; + + public TopicConfigManagerV2(BrokerController brokerController, ConfigStorage configStorage) { + super(brokerController); + this.configStorage = configStorage; + } + + @Override + public boolean load() { + return loadDataVersion() && loadTopicConfig(); + } + + public boolean loadDataVersion() { + try { + ConfigHelper.loadDataVersion(configStorage, TableId.TOPIC) + .ifPresent(buf -> ConfigHelper.onDataVersionLoad(buf, dataVersion)); + } catch (RocksDBException e) { + log.error("Failed to load data version of topic", e); + return false; + } + return true; + } + + private boolean loadTopicConfig() { + int keyLen = 1 /* table-prefix */ + 2 /* table-id */ + 1 /* record-type-prefix */; + ByteBuf beginKey = AbstractRocksDBStorage.POOLED_ALLOCATOR.buffer(keyLen); + beginKey.writeByte(TablePrefix.TABLE.getValue()); + beginKey.writeShort(TableId.TOPIC.getValue()); + beginKey.writeByte(RecordPrefix.DATA.getValue()); + + ByteBuf endKey = AbstractRocksDBStorage.POOLED_ALLOCATOR.buffer(keyLen); + endKey.writeByte(TablePrefix.TABLE.getValue()); + endKey.writeShort(TableId.TOPIC.getValue()); + endKey.writeByte(RecordPrefix.DATA.getValue() + 1); + + try (RocksIterator iterator = configStorage.iterate(beginKey.nioBuffer(), endKey.nioBuffer())) { + while (iterator.isValid()) { + byte[] key = iterator.key(); + byte[] value = iterator.value(); + TopicConfig topicConfig = parseTopicConfig(key, value); + if (null != topicConfig) { + super.updateSingleTopicConfigWithoutPersist(topicConfig); + } + iterator.next(); + } + } finally { + beginKey.release(); + endKey.release(); + } + return true; + } + + /** + * Key layout: [table-prefix, 1 byte][table-id, 2 bytes][record-type-prefix, 1 byte][topic-len, 2 bytes][topic-bytes] + * Value layout: [serialization-type, 1 byte][topic-config-bytes] + * + * @param key Topic config key representation in RocksDB + * @param value Topic config value representation in RocksDB + * @return decoded topic config + */ + private TopicConfig parseTopicConfig(byte[] key, byte[] value) { + ByteBuf keyBuf = Unpooled.wrappedBuffer(key); + ByteBuf valueBuf = Unpooled.wrappedBuffer(value); + try { + // Skip table-prefix, table-id, record-type-prefix + keyBuf.readerIndex(4); + short topicLen = keyBuf.readShort(); + assert topicLen == keyBuf.readableBytes(); + CharSequence topic = keyBuf.readCharSequence(topicLen, StandardCharsets.UTF_8); + assert null != topic; + + byte serializationType = valueBuf.readByte(); + if (SerializationType.JSON == SerializationType.valueOf(serializationType)) { + CharSequence json = valueBuf.readCharSequence(valueBuf.readableBytes(), StandardCharsets.UTF_8); + TopicConfig topicConfig = JSON.parseObject(json.toString(), TopicConfig.class); + assert topicConfig != null; + assert topic.equals(topicConfig.getTopicName()); + return topicConfig; + } + } finally { + keyBuf.release(); + valueBuf.release(); + } + + return null; + } + + @Override + public synchronized void persist() { + try { + configStorage.flushWAL(); + } catch (RocksDBException e) { + log.error("Failed to flush WAL", e); + } + } + + @Override + public TopicConfig selectTopicConfig(final String topic) { + if (MixAll.isLmq(topic)) { + return simpleLmqTopicConfig(topic); + } + return super.selectTopicConfig(topic); + } + + @Override + public void updateTopicConfig(final TopicConfig topicConfig) { + if (topicConfig == null || MixAll.isLmq(topicConfig.getTopicName())) { + return; + } + super.updateSingleTopicConfigWithoutPersist(topicConfig); + + ByteBuf keyBuf = ConfigHelper.keyBufOf(TableId.TOPIC, topicConfig.getTopicName()); + ByteBuf valueBuf = ConfigHelper.valueBufOf(topicConfig, SerializationType.JSON); + try (WriteBatch writeBatch = new WriteBatch()) { + writeBatch.put(keyBuf.nioBuffer(), valueBuf.nioBuffer()); + long stateMachineVersion = brokerController.getMessageStore() != null ? brokerController.getMessageStore().getStateMachineVersion() : 0; + ConfigHelper.stampDataVersion(writeBatch, dataVersion, stateMachineVersion); + configStorage.write(writeBatch); + } catch (RocksDBException e) { + log.error("Failed to update topic config", e); + } finally { + keyBuf.release(); + valueBuf.release(); + } + } + + @Override + protected TopicConfig removeTopicConfig(String topicName) { + ByteBuf keyBuf = ConfigHelper.keyBufOf(TableId.TOPIC, topicName); + try (WriteBatch writeBatch = new WriteBatch()) { + writeBatch.delete(keyBuf.nioBuffer()); + long stateMachineVersion = brokerController.getMessageStore() != null ? brokerController.getMessageStore().getStateMachineVersion() : 0; + ConfigHelper.stampDataVersion(writeBatch, dataVersion, stateMachineVersion); + configStorage.write(writeBatch); + } catch (RocksDBException e) { + log.error("Failed to delete topic config by topicName={}", topicName, e); + } finally { + keyBuf.release(); + } + return super.removeTopicConfig(topicName); + } + + @Override + public boolean containsTopic(String topic) { + if (MixAll.isLmq(topic)) { + return true; + } + return super.containsTopic(topic); + } + + private TopicConfig simpleLmqTopicConfig(String topic) { + return new TopicConfig(topic, 1, 1, PermName.PERM_READ | PermName.PERM_WRITE); + } +} diff --git a/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBOffsetSerializeWrapper.java b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/package-info.java similarity index 58% copy from broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBOffsetSerializeWrapper.java copy to broker/src/main/java/org/apache/rocketmq/broker/config/v2/package-info.java index 7a90fd62fb..1ea216193c 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBOffsetSerializeWrapper.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/package-info.java @@ -14,21 +14,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.broker.offset; +package org.apache.rocketmq.broker.config.v2; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -import org.apache.rocketmq.remoting.protocol.RemotingSerializable; - -public class RocksDBOffsetSerializeWrapper extends RemotingSerializable { - private ConcurrentMap<Integer, Long> offsetTable = new ConcurrentHashMap(16); - - public ConcurrentMap<Integer, Long> getOffsetTable() { - return offsetTable; - } - - public void setOffsetTable(ConcurrentMap<Integer, Long> offsetTable) { - this.offsetTable = offsetTable; - } -} +/* + * <strong>Endian</strong>: we use network byte order for all integrals, aka, always big endian. + * + * Unlike v1 config managers, implementations in this package prioritize data integrity and reliability. + * As a result,RocksDB write-ahead-log is always on and changes are immediately flushed. Another significant + * difference is that heap-based cache is removed because it is not necessary and duplicated to RocksDB + * MemTable/BlockCache. + */ diff --git a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java index 403324137c..ea46f1d8a1 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java @@ -43,7 +43,7 @@ public class ConsumerOffsetManager extends ConfigManager { protected static final Logger LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); public static final String TOPIC_GROUP_SEPARATOR = "@"; - private DataVersion dataVersion = new DataVersion(); + protected DataVersion dataVersion = new DataVersion(); protected ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> offsetTable = new ConcurrentHashMap<>(512); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java index e6855ef9a2..f62a3e4a09 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java @@ -49,7 +49,7 @@ public class SubscriptionGroupManager extends ConfigManager { private ConcurrentMap<String, ConcurrentMap<String, Integer>> forbiddenTable = new ConcurrentHashMap<>(4); - private final DataVersion dataVersion = new DataVersion(); + protected final DataVersion dataVersion = new DataVersion(); protected transient BrokerController brokerController; public SubscriptionGroupManager() { @@ -143,7 +143,7 @@ public class SubscriptionGroupManager extends ConfigManager { this.persist(); } - private void updateSubscriptionGroupConfigWithoutPersist(SubscriptionGroupConfig config) { + protected void updateSubscriptionGroupConfigWithoutPersist(SubscriptionGroupConfig config) { Map<String, String> newAttributes = request(config); Map<String, String> currentAttributes = current(config.getGroupName()); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java index 25d3218f2a..4530c10002 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java @@ -66,7 +66,7 @@ public class TopicConfigManager extends ConfigManager { private transient final Lock topicConfigTableLock = new ReentrantLock(); protected ConcurrentMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<>(1024); - private DataVersion dataVersion = new DataVersion(); + protected DataVersion dataVersion = new DataVersion(); protected transient BrokerController brokerController; public TopicConfigManager() { @@ -497,7 +497,7 @@ public class TopicConfigManager extends ConfigManager { } } - private void updateSingleTopicConfigWithoutPersist(final TopicConfig topicConfig) { + protected void updateSingleTopicConfigWithoutPersist(final TopicConfig topicConfig) { checkNotNull(topicConfig, "topicConfig shouldn't be null"); Map<String, String> newAttributes = request(topicConfig); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksDBConsumerOffsetManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksDBConsumerOffsetManagerTest.java index 58b690c9a3..5a7a2c38ac 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksDBConsumerOffsetManagerTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksDBConsumerOffsetManagerTest.java @@ -21,6 +21,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.broker.config.v1.RocksDBConsumerOffsetManager; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.store.config.MessageStoreConfig; import org.junit.After; diff --git a/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksDBLmqConsumerOffsetManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksDBLmqConsumerOffsetManagerTest.java index ea6528546d..1b9916d6ac 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksDBLmqConsumerOffsetManagerTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksDBLmqConsumerOffsetManagerTest.java @@ -18,6 +18,7 @@ package org.apache.rocketmq.broker.offset; import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.broker.config.v1.RocksDBLmqConsumerOffsetManager; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.store.config.MessageStoreConfig; diff --git a/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksDBOffsetSerializeWrapperTest.java b/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksDBOffsetSerializeWrapperTest.java index dde0401e8a..c01e63f31f 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksDBOffsetSerializeWrapperTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksDBOffsetSerializeWrapperTest.java @@ -21,6 +21,7 @@ package org.apache.rocketmq.broker.offset; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import org.apache.rocketmq.broker.config.v1.RocksDBOffsetSerializeWrapper; import org.junit.Before; import org.junit.Test; diff --git a/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksdbTransferOffsetAndCqTest.java b/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksdbTransferOffsetAndCqTest.java index b4800aec24..64c505eb77 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksdbTransferOffsetAndCqTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/offset/RocksdbTransferOffsetAndCqTest.java @@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import org.apache.commons.collections.MapUtils; import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.broker.config.v1.RocksDBConsumerOffsetManager; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.Pair; diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java index 04324043fb..d87f513355 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java @@ -36,8 +36,8 @@ import org.apache.rocketmq.broker.client.ConsumerManager; import org.apache.rocketmq.broker.client.net.Broker2Client; import org.apache.rocketmq.broker.offset.ConsumerOffsetManager; import org.apache.rocketmq.broker.schedule.ScheduleMessageService; -import org.apache.rocketmq.broker.subscription.RocksDBSubscriptionGroupManager; -import org.apache.rocketmq.broker.topic.RocksDBTopicConfigManager; +import org.apache.rocketmq.broker.config.v1.RocksDBSubscriptionGroupManager; +import org.apache.rocketmq.broker.config.v1.RocksDBTopicConfigManager; import org.apache.rocketmq.broker.topic.TopicConfigManager; import org.apache.rocketmq.common.BoundaryType; import org.apache.rocketmq.common.BrokerConfig; diff --git a/broker/src/test/java/org/apache/rocketmq/broker/subscription/RocksdbGroupConfigTransferTest.java b/broker/src/test/java/org/apache/rocketmq/broker/subscription/RocksdbGroupConfigTransferTest.java index 205e642843..26017af8a6 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/subscription/RocksdbGroupConfigTransferTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/subscription/RocksdbGroupConfigTransferTest.java @@ -18,6 +18,7 @@ package org.apache.rocketmq.broker.subscription; import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.broker.config.v1.RocksDBSubscriptionGroupManager; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.remoting.protocol.DataVersion; diff --git a/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigManagerTest.java index b0e0d05736..080e1dd5a3 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigManagerTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigManagerTest.java @@ -24,6 +24,7 @@ import java.util.Optional; import java.util.UUID; import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.broker.config.v1.RocksDBTopicConfigManager; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.TopicAttributes; diff --git a/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigTransferTest.java b/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigTransferTest.java index 2a72709098..fb345548e4 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigTransferTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/topic/RocksdbTopicConfigTransferTest.java @@ -18,6 +18,7 @@ package org.apache.rocketmq.broker.topic; import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.broker.config.v1.RocksDBTopicConfigManager; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.TopicConfig; diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java index 2acfdd69a5..c651047661 100644 --- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.common; import org.apache.rocketmq.common.annotation.ImportantField; +import org.apache.rocketmq.common.config.ConfigManagerVersion; import org.apache.rocketmq.common.constant.PermName; import org.apache.rocketmq.common.message.MessageRequestMode; import org.apache.rocketmq.common.metrics.MetricsExporterType; @@ -431,6 +432,11 @@ public class BrokerConfig extends BrokerIdentity { private boolean appendCkAsync = false; + /** + * V2 is recommended in cases where LMQ feature is extensively used. + */ + private String configManagerVersion = ConfigManagerVersion.V1.getVersion(); + public String getConfigBlackList() { return configBlackList; } @@ -1879,4 +1885,12 @@ public class BrokerConfig extends BrokerIdentity { public void setAppendCkAsync(boolean appendCkAsync) { this.appendCkAsync = appendCkAsync; } + + public String getConfigManagerVersion() { + return configManagerVersion; + } + + public void setConfigManagerVersion(String configManagerVersion) { + this.configManagerVersion = configManagerVersion; + } } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBOffsetSerializeWrapper.java b/common/src/main/java/org/apache/rocketmq/common/config/ConfigManagerVersion.java similarity index 59% rename from broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBOffsetSerializeWrapper.java rename to common/src/main/java/org/apache/rocketmq/common/config/ConfigManagerVersion.java index 7a90fd62fb..0d5dd6940a 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/offset/RocksDBOffsetSerializeWrapper.java +++ b/common/src/main/java/org/apache/rocketmq/common/config/ConfigManagerVersion.java @@ -14,21 +14,20 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.broker.offset; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; +package org.apache.rocketmq.common.config; -import org.apache.rocketmq.remoting.protocol.RemotingSerializable; +public enum ConfigManagerVersion { + V1("v1"), + V2("v2"), + ; + private final String version; -public class RocksDBOffsetSerializeWrapper extends RemotingSerializable { - private ConcurrentMap<Integer, Long> offsetTable = new ConcurrentHashMap(16); - - public ConcurrentMap<Integer, Long> getOffsetTable() { - return offsetTable; + ConfigManagerVersion(String version) { + this.version = version; } - public void setOffsetTable(ConcurrentMap<Integer, Long> offsetTable) { - this.offsetTable = offsetTable; + public String getVersion() { + return version; } }