lollipopjin commented on code in PR #8116: URL: https://github.com/apache/rocketmq/pull/8116#discussion_r1613034293
########## broker/src/main/java/org/apache/rocketmq/broker/subscription/RocksDBSubscriptionGroupManager.java: ########## @@ -16,39 +16,116 @@ */ package org.apache.rocketmq.broker.subscription; -import java.io.File; - +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.alibaba.fastjson.serializer.SerializerFeature; import org.apache.rocketmq.broker.BrokerController; -import org.apache.rocketmq.common.config.RocksDBConfigManager; +import org.apache.rocketmq.broker.RocksDBConfigManager; +import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.utils.DataConverter; +import org.apache.rocketmq.remoting.protocol.DataVersion; import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; +import org.rocksdb.RocksIterator; -import com.alibaba.fastjson.JSON; -import com.alibaba.fastjson.serializer.SerializerFeature; +import java.io.File; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.function.BiConsumer; public class RocksDBSubscriptionGroupManager extends SubscriptionGroupManager { + protected RocksDBConfigManager rocksDBConfigManager; + public RocksDBSubscriptionGroupManager(BrokerController brokerController) { super(brokerController, false); - this.rocksDBConfigManager = new RocksDBConfigManager(brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs()); + this.rocksDBConfigManager = new RocksDBConfigManager(rocksdbConfigFilePath(), brokerController.getMessageStoreConfig().getMemTableFlushIntervalMs()); } @Override public boolean load() { - if (!this.rocksDBConfigManager.load(configFilePath(), this::decode0)) { + if (!rocksDBConfigManager.init()) { + return false; + } + if (!loadDataVersion() || !loadSubscriptionGroupAndForbidden()) { return false; } this.init(); return true; } + public boolean loadDataVersion() { + return this.rocksDBConfigManager.loadDataVersion(); + } + + public boolean loadForbidden(BiConsumer<byte[], byte[]> biConsumer) { + try (RocksIterator iterator = this.rocksDBConfigManager.configRocksDBStorage.forbiddenIterator()) { + iterator.seekToFirst(); + while (iterator.isValid()) { + biConsumer.accept(iterator.key(), iterator.value()); + iterator.next(); + } + } + return true; + } + + public boolean loadSubscriptionGroupAndForbidden() { Review Comment: If you want to do as a pipeline case here , use if-else may be a more readable way. ########## broker/src/main/java/org/apache/rocketmq/broker/RocksDBConfigManager.java: ########## @@ -14,46 +14,66 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.common.config; +package org.apache.rocketmq.broker; +import com.alibaba.fastjson.JSON; +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.common.config.ConfigRocksDBStorage; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; +import org.apache.rocketmq.remoting.protocol.DataVersion; import org.rocksdb.FlushOptions; import org.rocksdb.RocksIterator; import org.rocksdb.Statistics; import org.rocksdb.WriteBatch; +import java.nio.charset.StandardCharsets; import java.util.function.BiConsumer; public class RocksDBConfigManager { protected static final Logger BROKER_LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); - - protected volatile boolean isStop = false; - protected ConfigRocksDBStorage configRocksDBStorage = null; + public volatile boolean isStop = false; + public ConfigRocksDBStorage configRocksDBStorage = null; private FlushOptions flushOptions = null; private volatile long lastFlushMemTableMicroSecond = 0; + + private final String filePath; private final long memTableFlushInterval; + private DataVersion kvDataVersion = new DataVersion(); + - public RocksDBConfigManager(long memTableFlushInterval) { + public RocksDBConfigManager(String filePath, long memTableFlushInterval) { + this.filePath = filePath; this.memTableFlushInterval = memTableFlushInterval; } - public boolean load(String configFilePath, BiConsumer<byte[], byte[]> biConsumer) { + public boolean init() { this.isStop = false; - this.configRocksDBStorage = new ConfigRocksDBStorage(configFilePath); - if (!this.configRocksDBStorage.start()) { - return false; - } - RocksIterator iterator = this.configRocksDBStorage.iterator(); + this.configRocksDBStorage = new ConfigRocksDBStorage(filePath); + return this.configRocksDBStorage.start(); + } + public final boolean loadDataVersion() { + String currDataVersionString = null; try { + byte[] dataVersion = this.configRocksDBStorage.getKvDataVersion(); + if (dataVersion != null && dataVersion.length > 0) { + currDataVersionString = new String(dataVersion, StandardCharsets.UTF_8); + } + kvDataVersion = StringUtils.isNotBlank(currDataVersionString) ? JSON.parseObject(currDataVersionString, DataVersion.class) : new DataVersion(); + return true; + } catch (Exception e) { + throw new RuntimeException(e); Review Comment: How about print some information for the exception here? ########## broker/src/main/java/org/apache/rocketmq/broker/RocksDBConfigManager.java: ########## @@ -14,46 +14,66 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.common.config; +package org.apache.rocketmq.broker; +import com.alibaba.fastjson.JSON; +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.common.config.ConfigRocksDBStorage; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; +import org.apache.rocketmq.remoting.protocol.DataVersion; import org.rocksdb.FlushOptions; import org.rocksdb.RocksIterator; import org.rocksdb.Statistics; import org.rocksdb.WriteBatch; +import java.nio.charset.StandardCharsets; import java.util.function.BiConsumer; public class RocksDBConfigManager { protected static final Logger BROKER_LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); - - protected volatile boolean isStop = false; - protected ConfigRocksDBStorage configRocksDBStorage = null; + public volatile boolean isStop = false; + public ConfigRocksDBStorage configRocksDBStorage = null; private FlushOptions flushOptions = null; private volatile long lastFlushMemTableMicroSecond = 0; + + private final String filePath; private final long memTableFlushInterval; + private DataVersion kvDataVersion = new DataVersion(); + - public RocksDBConfigManager(long memTableFlushInterval) { + public RocksDBConfigManager(String filePath, long memTableFlushInterval) { + this.filePath = filePath; this.memTableFlushInterval = memTableFlushInterval; } - public boolean load(String configFilePath, BiConsumer<byte[], byte[]> biConsumer) { + public boolean init() { this.isStop = false; - this.configRocksDBStorage = new ConfigRocksDBStorage(configFilePath); - if (!this.configRocksDBStorage.start()) { - return false; - } - RocksIterator iterator = this.configRocksDBStorage.iterator(); + this.configRocksDBStorage = new ConfigRocksDBStorage(filePath); + return this.configRocksDBStorage.start(); + } + public final boolean loadDataVersion() { Review Comment: Why use public final for method here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org