This is an automated email from the ASF dual-hosted git repository. lizhimin pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new e876bed084 [ISSUE #8955] Fix message buffer not release and dispatch thread exit in tiered storage (#8965) e876bed084 is described below commit e876bed084ca9d642011a9d77b82c7f52b582500 Author: lizhimins <707364...@qq.com> AuthorDate: Fri Nov 22 11:02:50 2024 +0800 [ISSUE #8955] Fix message buffer not release and dispatch thread exit in tiered storage (#8965) --- .../core/MessageStoreDispatcherImpl.java | 40 +++++++--- .../rocketmq/tieredstore/index/IndexStoreFile.java | 88 +++++++++++----------- .../tieredstore/provider/PosixFileSegment.java | 3 +- 3 files changed, 72 insertions(+), 59 deletions(-) diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImpl.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImpl.java index 982909c5ee..9b1e53564d 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImpl.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImpl.java @@ -92,8 +92,10 @@ public class MessageStoreDispatcherImpl extends ServiceThread implements Message semaphore.acquire(); this.doScheduleDispatch(flatFile, false) .whenComplete((future, throwable) -> semaphore.release()); - } catch (InterruptedException e) { + } catch (Throwable t) { semaphore.release(); + log.error("MessageStore dispatch error, topic={}, queueId={}", + flatFile.getMessageQueue().getTopic(), flatFile.getMessageQueue().getQueueId(), t); } } @@ -156,8 +158,7 @@ public class MessageStoreDispatcherImpl extends ServiceThread implements Message } if (currentOffset < minOffsetInQueue) { - log.warn("MessageDispatcher#dispatch, current offset is too small, " + - "topic={}, queueId={}, offset={}-{}, current={}", + log.warn("MessageDispatcher#dispatch, current offset is too small, topic={}, queueId={}, offset={}-{}, current={}", topic, queueId, minOffsetInQueue, maxOffsetInQueue, currentOffset); flatFileStore.destroyFile(flatFile.getMessageQueue()); flatFileStore.computeIfAbsent(new MessageQueue(topic, brokerName, queueId)); @@ -165,16 +166,14 @@ public class MessageStoreDispatcherImpl extends ServiceThread implements Message } if (currentOffset > maxOffsetInQueue) { - log.warn("MessageDispatcher#dispatch, current offset is too large, " + - "topic: {}, queueId: {}, offset={}-{}, current={}", + log.warn("MessageDispatcher#dispatch, current offset is too large, topic={}, queueId={}, offset={}-{}, current={}", topic, queueId, minOffsetInQueue, maxOffsetInQueue, currentOffset); return CompletableFuture.completedFuture(false); } long interval = TimeUnit.HOURS.toMillis(storeConfig.getCommitLogRollingInterval()); if (flatFile.rollingFile(interval)) { - log.info("MessageDispatcher#dispatch, rolling file, " + - "topic: {}, queueId: {}, offset={}-{}, current={}", + log.info("MessageDispatcher#dispatch, rolling file, topic={}, queueId={}, offset={}-{}, current={}", topic, queueId, minOffsetInQueue, maxOffsetInQueue, currentOffset); } @@ -189,8 +188,20 @@ public class MessageStoreDispatcherImpl extends ServiceThread implements Message ConsumeQueueInterface consumeQueue = defaultStore.getConsumeQueue(topic, queueId); CqUnit cqUnit = consumeQueue.get(currentOffset); + if (cqUnit == null) { + log.warn("MessageDispatcher#dispatch cq not found, topic={}, queueId={}, offset={}-{}, current={}, remain={}", + topic, queueId, minOffsetInQueue, maxOffsetInQueue, currentOffset, maxOffsetInQueue - currentOffset); + return CompletableFuture.completedFuture(false); + } + SelectMappedBufferResult message = defaultStore.selectOneMessageByOffset(cqUnit.getPos(), cqUnit.getSize()); + if (message == null) { + log.warn("MessageDispatcher#dispatch message not found, topic={}, queueId={}, offset={}-{}, current={}, remain={}", + topic, queueId, minOffsetInQueue, maxOffsetInQueue, currentOffset, maxOffsetInQueue - currentOffset); + return CompletableFuture.completedFuture(false); + } + boolean timeout = MessageFormatUtil.getStoreTimeStamp(message.getByteBuffer()) + storeConfig.getTieredStoreGroupCommitTimeout() < System.currentTimeMillis(); boolean bufferFull = maxOffsetInQueue - currentOffset > storeConfig.getTieredStoreGroupCommitCount(); @@ -198,6 +209,7 @@ public class MessageStoreDispatcherImpl extends ServiceThread implements Message if (!timeout && !bufferFull && !force) { log.debug("MessageDispatcher#dispatch hold, topic={}, queueId={}, offset={}-{}, current={}, remain={}", topic, queueId, minOffsetInQueue, maxOffsetInQueue, currentOffset, maxOffsetInQueue - currentOffset); + message.release(); return CompletableFuture.completedFuture(false); } else { if (MessageFormatUtil.getStoreTimeStamp(message.getByteBuffer()) + @@ -205,11 +217,11 @@ public class MessageStoreDispatcherImpl extends ServiceThread implements Message log.warn("MessageDispatcher#dispatch behind too much, topic={}, queueId={}, offset={}-{}, current={}, remain={}", topic, queueId, minOffsetInQueue, maxOffsetInQueue, currentOffset, maxOffsetInQueue - currentOffset); } else { - log.info("MessageDispatcher#dispatch, topic={}, queueId={}, offset={}-{}, current={}, remain={}", + log.info("MessageDispatcher#dispatch success, topic={}, queueId={}, offset={}-{}, current={}, remain={}", topic, queueId, minOffsetInQueue, maxOffsetInQueue, currentOffset, maxOffsetInQueue - currentOffset); } + message.release(); } - message.release(); long offset = currentOffset; for (; offset < targetOffset; offset++) { @@ -279,7 +291,7 @@ public class MessageStoreDispatcherImpl extends ServiceThread implements Message } flatFile.release(); } - }, MessageStoreExecutor.getInstance().bufferCommitExecutor); + }, storeExecutor.bufferCommitExecutor); } /** @@ -301,8 +313,12 @@ public class MessageStoreDispatcherImpl extends ServiceThread implements Message public void run() { log.info("{} service started", this.getServiceName()); while (!this.isStopped()) { - flatFileStore.deepCopyFlatFileToList().forEach(this::dispatchWithSemaphore); - this.waitForRunning(Duration.ofSeconds(20).toMillis()); + try { + flatFileStore.deepCopyFlatFileToList().forEach(this::dispatchWithSemaphore); + this.waitForRunning(Duration.ofSeconds(20).toMillis()); + } catch (Throwable t) { + log.error("MessageStore dispatch error", t); + } } log.info("{} service shutdown", this.getServiceName()); } diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java index f9604b43e6..25cd634873 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java @@ -38,7 +38,6 @@ import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.store.logfile.DefaultMappedFile; import org.apache.rocketmq.store.logfile.MappedFile; import org.apache.rocketmq.tieredstore.MessageStoreConfig; -import org.apache.rocketmq.tieredstore.MessageStoreExecutor; import org.apache.rocketmq.tieredstore.common.AppendResult; import org.apache.rocketmq.tieredstore.provider.FileSegment; import org.apache.rocketmq.tieredstore.provider.PosixFileSegment; @@ -261,56 +260,55 @@ public class IndexStoreFile implements IndexFile { protected CompletableFuture<List<IndexItem>> queryAsyncFromUnsealedFile( String key, int maxCount, long beginTime, long endTime) { - return CompletableFuture.supplyAsync(() -> { - List<IndexItem> result = new ArrayList<>(); - try { - fileReadWriteLock.readLock().lock(); - if (!UNSEALED.equals(this.fileStatus.get()) && !SEALED.equals(this.fileStatus.get())) { - return result; - } + List<IndexItem> result = new ArrayList<>(); + try { + fileReadWriteLock.readLock().lock(); + if (!UNSEALED.equals(this.fileStatus.get()) && !SEALED.equals(this.fileStatus.get())) { + return CompletableFuture.completedFuture(result); + } - if (mappedFile == null || !mappedFile.hold()) { - return result; - } + if (mappedFile == null || !mappedFile.hold()) { + return CompletableFuture.completedFuture(result); + } - int hashCode = this.hashCode(key); - int slotPosition = this.getSlotPosition(hashCode % this.hashSlotMaxCount); - int slotValue = this.getSlotValue(slotPosition); + int hashCode = this.hashCode(key); + int slotPosition = this.getSlotPosition(hashCode % this.hashSlotMaxCount); + int slotValue = this.getSlotValue(slotPosition); - int left = MAX_QUERY_COUNT; - while (left > 0 && - slotValue > INVALID_INDEX && - slotValue <= this.indexItemCount.get()) { + int left = MAX_QUERY_COUNT; + while (left > 0 && + slotValue > INVALID_INDEX && + slotValue <= this.indexItemCount.get()) { - byte[] bytes = new byte[IndexItem.INDEX_ITEM_SIZE]; - ByteBuffer buffer = this.byteBuffer.duplicate(); - buffer.position(this.getItemPosition(slotValue)); - buffer.get(bytes); - IndexItem indexItem = new IndexItem(bytes); - long storeTimestamp = indexItem.getTimeDiff() + beginTimestamp.get(); - if (hashCode == indexItem.getHashCode() && - beginTime <= storeTimestamp && storeTimestamp <= endTime) { - result.add(indexItem); - if (result.size() > maxCount) { - break; - } + byte[] bytes = new byte[IndexItem.INDEX_ITEM_SIZE]; + ByteBuffer buffer = this.byteBuffer.duplicate(); + buffer.position(this.getItemPosition(slotValue)); + buffer.get(bytes); + IndexItem indexItem = new IndexItem(bytes); + long storeTimestamp = indexItem.getTimeDiff() + beginTimestamp.get(); + if (hashCode == indexItem.getHashCode() && + beginTime <= storeTimestamp && storeTimestamp <= endTime) { + result.add(indexItem); + if (result.size() > maxCount) { + break; } - slotValue = indexItem.getItemIndex(); - left--; } - - log.debug("IndexStoreFile query from unsealed mapped file, timestamp: {}, result size: {}, " + - "key: {}, hashCode: {}, maxCount: {}, timestamp={}-{}", - getTimestamp(), result.size(), key, hashCode, maxCount, beginTime, endTime); - } catch (Exception e) { - log.error("IndexStoreFile query from unsealed mapped file error, timestamp: {}, " + - "key: {}, maxCount: {}, timestamp={}-{}", getTimestamp(), key, maxCount, beginTime, endTime, e); - } finally { - fileReadWriteLock.readLock().unlock(); - mappedFile.release(); + slotValue = indexItem.getItemIndex(); + left--; } - return result; - }, MessageStoreExecutor.getInstance().bufferFetchExecutor); + + log.debug("IndexStoreFile query from unsealed mapped file, timestamp: {}, result size: {}, " + + "key: {}, hashCode: {}, maxCount: {}, timestamp={}-{}", + getTimestamp(), result.size(), key, hashCode, maxCount, beginTime, endTime); + } catch (Exception e) { + log.error("IndexStoreFile query from unsealed mapped file error, timestamp: {}, " + + "key: {}, maxCount: {}, timestamp={}-{}", getTimestamp(), key, maxCount, beginTime, endTime, e); + } finally { + fileReadWriteLock.readLock().unlock(); + mappedFile.release(); + } + + return CompletableFuture.completedFuture(result); } protected CompletableFuture<List<IndexItem>> queryAsyncFromSegmentFile( @@ -465,7 +463,7 @@ public class IndexStoreFile implements IndexFile { fileReadWriteLock.writeLock().lock(); this.fileStatus.set(IndexStatusEnum.SHUTDOWN); if (this.fileSegment != null && this.fileSegment instanceof PosixFileSegment) { - ((PosixFileSegment) this.fileSegment).close(); + this.fileSegment.close(); } if (this.mappedFile != null) { this.mappedFile.shutdown(TimeUnit.SECONDS.toMillis(10)); diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/PosixFileSegment.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/PosixFileSegment.java index fb150c928c..656af2ba1c 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/PosixFileSegment.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/PosixFileSegment.java @@ -30,7 +30,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.tieredstore.MessageStoreConfig; -import org.apache.rocketmq.tieredstore.MessageStoreExecutor; import org.apache.rocketmq.tieredstore.common.FileSegmentType; import org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsManager; import org.apache.rocketmq.tieredstore.stream.FileSegmentInputStream; @@ -230,6 +229,6 @@ public class PosixFileSegment extends FileSegment { return false; } return true; - }, MessageStoreExecutor.getInstance().bufferCommitExecutor); + }); } }