This is an automated email from the ASF dual-hosted git repository.

lollipop pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 94d9185f2c [ISSUE #8895] Fix NPE when broker shutdown and optimize the 
log #9094
94d9185f2c is described below

commit 94d9185f2c80bcfb9ffca03a65080f48060f0a39
Author: qianye <wuxingcan....@alibaba-inc.com>
AuthorDate: Tue Jan 14 14:50:56 2025 +0800

    [ISSUE #8895] Fix NPE when broker shutdown and optimize the log #9094
---
 .../common/config/AbstractRocksDBStorage.java        | 20 ++++++++++----------
 .../apache/rocketmq/store/DefaultMessageStore.java   |  4 +---
 .../store/queue/RocksDBConsumeQueueOffsetTable.java  |  2 +-
 3 files changed, 12 insertions(+), 14 deletions(-)

diff --git 
a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
 
b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
index 347d92304d..6c0bce5929 100644
--- 
a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
+++ 
b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
@@ -18,7 +18,16 @@ package org.apache.rocketmq.common.config;
 
 import com.google.common.collect.Maps;
 import io.netty.buffer.PooledByteBufAllocator;
+import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.utils.ThreadUtils;
@@ -43,16 +52,6 @@ import org.rocksdb.Status;
 import org.rocksdb.WriteBatch;
 import org.rocksdb.WriteOptions;
 
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
 public abstract class AbstractRocksDBStorage {
     protected static final Logger LOGGER = 
LoggerFactory.getLogger(LoggerName.ROCKSDB_LOGGER_NAME);
 
@@ -381,6 +380,7 @@ public abstract class AbstractRocksDBStorage {
     public synchronized boolean shutdown() {
         try {
             if (!this.loaded) {
+                LOGGER.info("RocksDBStorage is not loaded, shutdown OK. 
dbPath={}, readOnly={}", this.dbPath, this.readOnly);
                 return true;
             }
 
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java 
b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 9d3c46a438..187a0729e8 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -517,11 +517,9 @@ public class DefaultMessageStore implements MessageStore {
             if (this.compactionService != null) {
                 this.compactionService.shutdown();
             }
-
-            if (messageStoreConfig.isRocksdbCQDoubleWriteEnable()) {
+            if (messageStoreConfig.isRocksdbCQDoubleWriteEnable() && 
this.rocksDBMessageStore != null) {
                 this.rocksDBMessageStore.consumeQueueStore.shutdown();
             }
-
             this.flushConsumeQueueService.shutdown();
             this.allocateMappedFileService.shutdown();
             this.storeCheckpoint.flush();
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueOffsetTable.java
 
b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueOffsetTable.java
index a91ae5e244..cb989852fb 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueOffsetTable.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueOffsetTable.java
@@ -144,7 +144,7 @@ public class RocksDBConsumeQueueOffsetTable {
         Function<OffsetEntry, Boolean> predicate = entry -> entry.type == 
OffsetEntryType.MAXIMUM;
         Consumer<OffsetEntry> fn = entry -> {
             topicQueueMaxCqOffset.putIfAbsent(entry.topic + "-" + 
entry.queueId, entry.offset);
-            ROCKSDB_LOG.info("Max {}:{} --> {}|{}", entry.topic, 
entry.queueId, entry.offset, entry.commitLogOffset);
+            log.info("LoadMaxConsumeQueueOffsets Max {}:{} --> {}|{}", 
entry.topic, entry.queueId, entry.offset, entry.commitLogOffset);
         };
         try {
             forEach(predicate, fn);

Reply via email to