This is an automated email from the ASF dual-hosted git repository. lollipop pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new 94d9185f2c [ISSUE #8895] Fix NPE when broker shutdown and optimize the log #9094 94d9185f2c is described below commit 94d9185f2c80bcfb9ffca03a65080f48060f0a39 Author: qianye <wuxingcan....@alibaba-inc.com> AuthorDate: Tue Jan 14 14:50:56 2025 +0800 [ISSUE #8895] Fix NPE when broker shutdown and optimize the log #9094 --- .../common/config/AbstractRocksDBStorage.java | 20 ++++++++++---------- .../apache/rocketmq/store/DefaultMessageStore.java | 4 +--- .../store/queue/RocksDBConsumeQueueOffsetTable.java | 2 +- 3 files changed, 12 insertions(+), 14 deletions(-) diff --git a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java index 347d92304d..6c0bce5929 100644 --- a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java +++ b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java @@ -18,7 +18,16 @@ package org.apache.rocketmq.common.config; import com.google.common.collect.Maps; import io.netty.buffer.PooledByteBufAllocator; +import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.Semaphore; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.utils.ThreadUtils; @@ -43,16 +52,6 @@ import org.rocksdb.Status; import org.rocksdb.WriteBatch; import org.rocksdb.WriteOptions; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.Semaphore; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - public abstract class AbstractRocksDBStorage { protected static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.ROCKSDB_LOGGER_NAME); @@ -381,6 +380,7 @@ public abstract class AbstractRocksDBStorage { public synchronized boolean shutdown() { try { if (!this.loaded) { + LOGGER.info("RocksDBStorage is not loaded, shutdown OK. dbPath={}, readOnly={}", this.dbPath, this.readOnly); return true; } diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index 9d3c46a438..187a0729e8 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -517,11 +517,9 @@ public class DefaultMessageStore implements MessageStore { if (this.compactionService != null) { this.compactionService.shutdown(); } - - if (messageStoreConfig.isRocksdbCQDoubleWriteEnable()) { + if (messageStoreConfig.isRocksdbCQDoubleWriteEnable() && this.rocksDBMessageStore != null) { this.rocksDBMessageStore.consumeQueueStore.shutdown(); } - this.flushConsumeQueueService.shutdown(); this.allocateMappedFileService.shutdown(); this.storeCheckpoint.flush(); diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueOffsetTable.java b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueOffsetTable.java index a91ae5e244..cb989852fb 100644 --- a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueOffsetTable.java +++ b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueOffsetTable.java @@ -144,7 +144,7 @@ public class RocksDBConsumeQueueOffsetTable { Function<OffsetEntry, Boolean> predicate = entry -> entry.type == OffsetEntryType.MAXIMUM; Consumer<OffsetEntry> fn = entry -> { topicQueueMaxCqOffset.putIfAbsent(entry.topic + "-" + entry.queueId, entry.offset); - ROCKSDB_LOG.info("Max {}:{} --> {}|{}", entry.topic, entry.queueId, entry.offset, entry.commitLogOffset); + log.info("LoadMaxConsumeQueueOffsets Max {}:{} --> {}|{}", entry.topic, entry.queueId, entry.offset, entry.commitLogOffset); }; try { forEach(predicate, fn);