This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new e876bed084 [ISSUE #8955] Fix message buffer not release and dispatch 
thread exit in tiered storage (#8965)
e876bed084 is described below

commit e876bed084ca9d642011a9d77b82c7f52b582500
Author: lizhimins <707364...@qq.com>
AuthorDate: Fri Nov 22 11:02:50 2024 +0800

    [ISSUE #8955] Fix message buffer not release and dispatch thread exit in 
tiered storage (#8965)
---
 .../core/MessageStoreDispatcherImpl.java           | 40 +++++++---
 .../rocketmq/tieredstore/index/IndexStoreFile.java | 88 +++++++++++-----------
 .../tieredstore/provider/PosixFileSegment.java     |  3 +-
 3 files changed, 72 insertions(+), 59 deletions(-)

diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImpl.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImpl.java
index 982909c5ee..9b1e53564d 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImpl.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImpl.java
@@ -92,8 +92,10 @@ public class MessageStoreDispatcherImpl extends 
ServiceThread implements Message
             semaphore.acquire();
             this.doScheduleDispatch(flatFile, false)
                 .whenComplete((future, throwable) -> semaphore.release());
-        } catch (InterruptedException e) {
+        } catch (Throwable t) {
             semaphore.release();
+            log.error("MessageStore dispatch error, topic={}, queueId={}",
+                flatFile.getMessageQueue().getTopic(), 
flatFile.getMessageQueue().getQueueId(), t);
         }
     }
 
@@ -156,8 +158,7 @@ public class MessageStoreDispatcherImpl extends 
ServiceThread implements Message
             }
 
             if (currentOffset < minOffsetInQueue) {
-                log.warn("MessageDispatcher#dispatch, current offset is too 
small, " +
-                        "topic={}, queueId={}, offset={}-{}, current={}",
+                log.warn("MessageDispatcher#dispatch, current offset is too 
small, topic={}, queueId={}, offset={}-{}, current={}",
                     topic, queueId, minOffsetInQueue, maxOffsetInQueue, 
currentOffset);
                 flatFileStore.destroyFile(flatFile.getMessageQueue());
                 flatFileStore.computeIfAbsent(new MessageQueue(topic, 
brokerName, queueId));
@@ -165,16 +166,14 @@ public class MessageStoreDispatcherImpl extends 
ServiceThread implements Message
             }
 
             if (currentOffset > maxOffsetInQueue) {
-                log.warn("MessageDispatcher#dispatch, current offset is too 
large, " +
-                        "topic: {}, queueId: {}, offset={}-{}, current={}",
+                log.warn("MessageDispatcher#dispatch, current offset is too 
large, topic={}, queueId={}, offset={}-{}, current={}",
                     topic, queueId, minOffsetInQueue, maxOffsetInQueue, 
currentOffset);
                 return CompletableFuture.completedFuture(false);
             }
 
             long interval = 
TimeUnit.HOURS.toMillis(storeConfig.getCommitLogRollingInterval());
             if (flatFile.rollingFile(interval)) {
-                log.info("MessageDispatcher#dispatch, rolling file, " +
-                        "topic: {}, queueId: {}, offset={}-{}, current={}",
+                log.info("MessageDispatcher#dispatch, rolling file, topic={}, 
queueId={}, offset={}-{}, current={}",
                     topic, queueId, minOffsetInQueue, maxOffsetInQueue, 
currentOffset);
             }
 
@@ -189,8 +188,20 @@ public class MessageStoreDispatcherImpl extends 
ServiceThread implements Message
 
             ConsumeQueueInterface consumeQueue = 
defaultStore.getConsumeQueue(topic, queueId);
             CqUnit cqUnit = consumeQueue.get(currentOffset);
+            if (cqUnit == null) {
+                log.warn("MessageDispatcher#dispatch cq not found, topic={}, 
queueId={}, offset={}-{}, current={}, remain={}",
+                    topic, queueId, minOffsetInQueue, maxOffsetInQueue, 
currentOffset, maxOffsetInQueue - currentOffset);
+                return CompletableFuture.completedFuture(false);
+            }
+
             SelectMappedBufferResult message =
                 defaultStore.selectOneMessageByOffset(cqUnit.getPos(), 
cqUnit.getSize());
+            if (message == null) {
+                log.warn("MessageDispatcher#dispatch message not found, 
topic={}, queueId={}, offset={}-{}, current={}, remain={}",
+                    topic, queueId, minOffsetInQueue, maxOffsetInQueue, 
currentOffset, maxOffsetInQueue - currentOffset);
+                return CompletableFuture.completedFuture(false);
+            }
+
             boolean timeout = 
MessageFormatUtil.getStoreTimeStamp(message.getByteBuffer()) +
                 storeConfig.getTieredStoreGroupCommitTimeout() < 
System.currentTimeMillis();
             boolean bufferFull = maxOffsetInQueue - currentOffset > 
storeConfig.getTieredStoreGroupCommitCount();
@@ -198,6 +209,7 @@ public class MessageStoreDispatcherImpl extends 
ServiceThread implements Message
             if (!timeout && !bufferFull && !force) {
                 log.debug("MessageDispatcher#dispatch hold, topic={}, 
queueId={}, offset={}-{}, current={}, remain={}",
                     topic, queueId, minOffsetInQueue, maxOffsetInQueue, 
currentOffset, maxOffsetInQueue - currentOffset);
+                message.release();
                 return CompletableFuture.completedFuture(false);
             } else {
                 if 
(MessageFormatUtil.getStoreTimeStamp(message.getByteBuffer()) +
@@ -205,11 +217,11 @@ public class MessageStoreDispatcherImpl extends 
ServiceThread implements Message
                     log.warn("MessageDispatcher#dispatch behind too much, 
topic={}, queueId={}, offset={}-{}, current={}, remain={}",
                         topic, queueId, minOffsetInQueue, maxOffsetInQueue, 
currentOffset, maxOffsetInQueue - currentOffset);
                 } else {
-                    log.info("MessageDispatcher#dispatch, topic={}, 
queueId={}, offset={}-{}, current={}, remain={}",
+                    log.info("MessageDispatcher#dispatch success, topic={}, 
queueId={}, offset={}-{}, current={}, remain={}",
                         topic, queueId, minOffsetInQueue, maxOffsetInQueue, 
currentOffset, maxOffsetInQueue - currentOffset);
                 }
+                message.release();
             }
-            message.release();
 
             long offset = currentOffset;
             for (; offset < targetOffset; offset++) {
@@ -279,7 +291,7 @@ public class MessageStoreDispatcherImpl extends 
ServiceThread implements Message
                 }
                 flatFile.release();
             }
-        }, MessageStoreExecutor.getInstance().bufferCommitExecutor);
+        }, storeExecutor.bufferCommitExecutor);
     }
 
     /**
@@ -301,8 +313,12 @@ public class MessageStoreDispatcherImpl extends 
ServiceThread implements Message
     public void run() {
         log.info("{} service started", this.getServiceName());
         while (!this.isStopped()) {
-            
flatFileStore.deepCopyFlatFileToList().forEach(this::dispatchWithSemaphore);
-            this.waitForRunning(Duration.ofSeconds(20).toMillis());
+            try {
+                
flatFileStore.deepCopyFlatFileToList().forEach(this::dispatchWithSemaphore);
+                this.waitForRunning(Duration.ofSeconds(20).toMillis());
+            } catch (Throwable t) {
+                log.error("MessageStore dispatch error", t);
+            }
         }
         log.info("{} service shutdown", this.getServiceName());
     }
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java
index f9604b43e6..25cd634873 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java
@@ -38,7 +38,6 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.store.logfile.DefaultMappedFile;
 import org.apache.rocketmq.store.logfile.MappedFile;
 import org.apache.rocketmq.tieredstore.MessageStoreConfig;
-import org.apache.rocketmq.tieredstore.MessageStoreExecutor;
 import org.apache.rocketmq.tieredstore.common.AppendResult;
 import org.apache.rocketmq.tieredstore.provider.FileSegment;
 import org.apache.rocketmq.tieredstore.provider.PosixFileSegment;
@@ -261,56 +260,55 @@ public class IndexStoreFile implements IndexFile {
     protected CompletableFuture<List<IndexItem>> queryAsyncFromUnsealedFile(
         String key, int maxCount, long beginTime, long endTime) {
 
-        return CompletableFuture.supplyAsync(() -> {
-            List<IndexItem> result = new ArrayList<>();
-            try {
-                fileReadWriteLock.readLock().lock();
-                if (!UNSEALED.equals(this.fileStatus.get()) && 
!SEALED.equals(this.fileStatus.get())) {
-                    return result;
-                }
+        List<IndexItem> result = new ArrayList<>();
+        try {
+            fileReadWriteLock.readLock().lock();
+            if (!UNSEALED.equals(this.fileStatus.get()) && 
!SEALED.equals(this.fileStatus.get())) {
+                return CompletableFuture.completedFuture(result);
+            }
 
-                if (mappedFile == null || !mappedFile.hold()) {
-                    return result;
-                }
+            if (mappedFile == null || !mappedFile.hold()) {
+                return CompletableFuture.completedFuture(result);
+            }
 
-                int hashCode = this.hashCode(key);
-                int slotPosition = this.getSlotPosition(hashCode % 
this.hashSlotMaxCount);
-                int slotValue = this.getSlotValue(slotPosition);
+            int hashCode = this.hashCode(key);
+            int slotPosition = this.getSlotPosition(hashCode % 
this.hashSlotMaxCount);
+            int slotValue = this.getSlotValue(slotPosition);
 
-                int left = MAX_QUERY_COUNT;
-                while (left > 0 &&
-                    slotValue > INVALID_INDEX &&
-                    slotValue <= this.indexItemCount.get()) {
+            int left = MAX_QUERY_COUNT;
+            while (left > 0 &&
+                slotValue > INVALID_INDEX &&
+                slotValue <= this.indexItemCount.get()) {
 
-                    byte[] bytes = new byte[IndexItem.INDEX_ITEM_SIZE];
-                    ByteBuffer buffer = this.byteBuffer.duplicate();
-                    buffer.position(this.getItemPosition(slotValue));
-                    buffer.get(bytes);
-                    IndexItem indexItem = new IndexItem(bytes);
-                    long storeTimestamp = indexItem.getTimeDiff() + 
beginTimestamp.get();
-                    if (hashCode == indexItem.getHashCode() &&
-                        beginTime <= storeTimestamp && storeTimestamp <= 
endTime) {
-                        result.add(indexItem);
-                        if (result.size() > maxCount) {
-                            break;
-                        }
+                byte[] bytes = new byte[IndexItem.INDEX_ITEM_SIZE];
+                ByteBuffer buffer = this.byteBuffer.duplicate();
+                buffer.position(this.getItemPosition(slotValue));
+                buffer.get(bytes);
+                IndexItem indexItem = new IndexItem(bytes);
+                long storeTimestamp = indexItem.getTimeDiff() + 
beginTimestamp.get();
+                if (hashCode == indexItem.getHashCode() &&
+                    beginTime <= storeTimestamp && storeTimestamp <= endTime) {
+                    result.add(indexItem);
+                    if (result.size() > maxCount) {
+                        break;
                     }
-                    slotValue = indexItem.getItemIndex();
-                    left--;
                 }
-
-                log.debug("IndexStoreFile query from unsealed mapped file, 
timestamp: {}, result size: {}, " +
-                        "key: {}, hashCode: {}, maxCount: {}, timestamp={}-{}",
-                    getTimestamp(), result.size(), key, hashCode, maxCount, 
beginTime, endTime);
-            } catch (Exception e) {
-                log.error("IndexStoreFile query from unsealed mapped file 
error, timestamp: {}, " +
-                    "key: {}, maxCount: {}, timestamp={}-{}", getTimestamp(), 
key, maxCount, beginTime, endTime, e);
-            } finally {
-                fileReadWriteLock.readLock().unlock();
-                mappedFile.release();
+                slotValue = indexItem.getItemIndex();
+                left--;
             }
-            return result;
-        }, MessageStoreExecutor.getInstance().bufferFetchExecutor);
+
+            log.debug("IndexStoreFile query from unsealed mapped file, 
timestamp: {}, result size: {}, " +
+                    "key: {}, hashCode: {}, maxCount: {}, timestamp={}-{}",
+                getTimestamp(), result.size(), key, hashCode, maxCount, 
beginTime, endTime);
+        } catch (Exception e) {
+            log.error("IndexStoreFile query from unsealed mapped file error, 
timestamp: {}, " +
+                "key: {}, maxCount: {}, timestamp={}-{}", getTimestamp(), key, 
maxCount, beginTime, endTime, e);
+        } finally {
+            fileReadWriteLock.readLock().unlock();
+            mappedFile.release();
+        }
+
+        return CompletableFuture.completedFuture(result);
     }
 
     protected CompletableFuture<List<IndexItem>> queryAsyncFromSegmentFile(
@@ -465,7 +463,7 @@ public class IndexStoreFile implements IndexFile {
             fileReadWriteLock.writeLock().lock();
             this.fileStatus.set(IndexStatusEnum.SHUTDOWN);
             if (this.fileSegment != null && this.fileSegment instanceof 
PosixFileSegment) {
-                ((PosixFileSegment) this.fileSegment).close();
+                this.fileSegment.close();
             }
             if (this.mappedFile != null) {
                 this.mappedFile.shutdown(TimeUnit.SECONDS.toMillis(10));
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/PosixFileSegment.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/PosixFileSegment.java
index fb150c928c..656af2ba1c 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/PosixFileSegment.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/PosixFileSegment.java
@@ -30,7 +30,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.tieredstore.MessageStoreConfig;
-import org.apache.rocketmq.tieredstore.MessageStoreExecutor;
 import org.apache.rocketmq.tieredstore.common.FileSegmentType;
 import org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsManager;
 import org.apache.rocketmq.tieredstore.stream.FileSegmentInputStream;
@@ -230,6 +229,6 @@ public class PosixFileSegment extends FileSegment {
                 return false;
             }
             return true;
-        }, MessageStoreExecutor.getInstance().bufferCommitExecutor);
+        });
     }
 }

Reply via email to