This is an automated email from the ASF dual-hosted git repository. caigy pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new 86d59d2485 Add the ability to write ConsumeQueue using fileChannel to prevent JVM crashes in some situations (#8403) 86d59d2485 is described below commit 86d59d2485b5fed162db3743e11c0902de3e34ad Author: rongtong <jinrongton...@mails.ucas.ac.cn> AuthorDate: Mon Jul 22 17:20:11 2024 +0800 Add the ability to write ConsumeQueue using fileChannel to prevent JVM crashes in some situations (#8403) --- .../org/apache/rocketmq/store/ConsumeQueue.java | 15 ++++++++++-- .../rocketmq/store/config/MessageStoreConfig.java | 10 ++++++++ .../rocketmq/store/logfile/DefaultMappedFile.java | 27 ++++++++++++++++++---- .../apache/rocketmq/store/logfile/MappedFile.java | 11 +++++++++ .../rocketmq/store/queue/BatchConsumeQueue.java | 7 +++++- .../rocketmq/store/queue/SparseConsumeQueue.java | 10 +++++++- 6 files changed, 71 insertions(+), 9 deletions(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java index 569cc3cfaa..eb8af4ab19 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java @@ -833,7 +833,13 @@ public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle { } } this.setMaxPhysicOffset(offset + size); - return mappedFile.appendMessage(this.byteBufferIndex.array()); + boolean appendResult; + if (messageStore.getMessageStoreConfig().isPutConsumeQueueDataByFileChannel()) { + appendResult = mappedFile.appendMessageUsingFileChannel(this.byteBufferIndex.array()); + } else { + appendResult = mappedFile.appendMessage(this.byteBufferIndex.array()); + } + return appendResult; } return false; } @@ -846,7 +852,12 @@ public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle { int until = (int) (untilWhere % this.mappedFileQueue.getMappedFileSize()); for (int i = 0; i < until; i += CQ_STORE_UNIT_SIZE) { - mappedFile.appendMessage(byteBuffer.array()); + if (messageStore.getMessageStoreConfig().isPutConsumeQueueDataByFileChannel()) { + mappedFile.appendMessageUsingFileChannel(byteBuffer.array()); + } else { + mappedFile.appendMessage(byteBuffer.array()); + } + } } diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java index 0060b144cf..5b2a1931b3 100644 --- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java +++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java @@ -419,6 +419,8 @@ public class MessageStoreConfig { */ private boolean readUnCommitted = false; + private boolean putConsumeQueueDataByFileChannel = true; + public boolean isEnabledAppendPropCRC() { return enabledAppendPropCRC; } @@ -1832,4 +1834,12 @@ public class MessageStoreConfig { public void setReadUnCommitted(boolean readUnCommitted) { this.readUnCommitted = readUnCommitted; } + + public boolean isPutConsumeQueueDataByFileChannel() { + return putConsumeQueueDataByFileChannel; + } + + public void setPutConsumeQueueDataByFileChannel(boolean putConsumeQueueDataByFileChannel) { + this.putConsumeQueueDataByFileChannel = putConsumeQueueDataByFileChannel; + } } diff --git a/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java b/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java index 03477c3324..c490d093a1 100644 --- a/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java +++ b/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java @@ -97,14 +97,14 @@ public class DefaultMappedFile extends AbstractMappedFile { protected long mappedByteBufferAccessCountSinceLastSwap = 0L; /** - * If this mapped file belongs to consume queue, this field stores store-timestamp of first message referenced - * by this logical queue. + * If this mapped file belongs to consume queue, this field stores store-timestamp of first message referenced by + * this logical queue. */ private long startTimestamp = -1; /** - * If this mapped file belongs to consume queue, this field stores store-timestamp of last message referenced - * by this logical queue. + * If this mapped file belongs to consume queue, this field stores store-timestamp of last message referenced by + * this logical queue. */ private long stopTimestamp = -1; @@ -357,6 +357,24 @@ public class DefaultMappedFile extends AbstractMappedFile { return false; } + @Override + public boolean appendMessageUsingFileChannel(byte[] data) { + int currentPos = WROTE_POSITION_UPDATER.get(this); + + if ((currentPos + data.length) <= this.fileSize) { + try { + this.fileChannel.position(currentPos); + this.fileChannel.write(ByteBuffer.wrap(data, 0, data.length)); + } catch (Throwable e) { + log.error("Error occurred when append message to mappedFile.", e); + } + WROTE_POSITION_UPDATER.addAndGet(this, data.length); + return true; + } + + return false; + } + /** * @return The current flushed position */ @@ -840,7 +858,6 @@ public class DefaultMappedFile extends AbstractMappedFile { this.stopTimestamp = stopTimestamp; } - public Iterator<SelectMappedBufferResult> iterator(int startPos) { return new Itr(startPos); } diff --git a/store/src/main/java/org/apache/rocketmq/store/logfile/MappedFile.java b/store/src/main/java/org/apache/rocketmq/store/logfile/MappedFile.java index dfcf66f088..fd70d6c563 100644 --- a/store/src/main/java/org/apache/rocketmq/store/logfile/MappedFile.java +++ b/store/src/main/java/org/apache/rocketmq/store/logfile/MappedFile.java @@ -101,12 +101,23 @@ public interface MappedFile { /** * Appends a raw message data represents by a byte array to the current {@code MappedFile}. + * Using mappedByteBuffer * * @param data the byte array to append * @return true if success; false otherwise. */ boolean appendMessage(byte[] data); + + /** + * Appends a raw message data represents by a byte array to the current {@code MappedFile}. + * Using fileChannel + * + * @param data the byte array to append + * @return true if success; false otherwise. + */ + boolean appendMessageUsingFileChannel(byte[] data); + /** * Appends a raw message data represents by a byte array to the current {@code MappedFile}. * diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java index 7108c835c8..1617182724 100644 --- a/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java @@ -587,7 +587,12 @@ public class BatchConsumeQueue implements ConsumeQueueInterface { MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(this.mappedFileQueue.getMaxOffset()); if (mappedFile != null) { boolean isNewFile = isNewFile(mappedFile); - boolean appendRes = mappedFile.appendMessage(this.byteBufferItem.array()); + boolean appendRes; + if (messageStore.getMessageStoreConfig().isPutConsumeQueueDataByFileChannel()) { + appendRes = mappedFile.appendMessageUsingFileChannel(this.byteBufferItem.array()); + } else { + appendRes = mappedFile.appendMessage(this.byteBufferItem.array()); + } if (appendRes) { maxMsgPhyOffsetInCommitLog = offset; maxOffsetInQueue = msgBaseOffset + batchSize; diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/SparseConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/queue/SparseConsumeQueue.java index 4a5f3a93b1..7e14de30ab 100644 --- a/store/src/main/java/org/apache/rocketmq/store/queue/SparseConsumeQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/queue/SparseConsumeQueue.java @@ -262,7 +262,15 @@ public class SparseConsumeQueue extends BatchConsumeQueue { this.byteBufferItem.putShort((short)0); this.byteBufferItem.putInt(INVALID_POS); this.byteBufferItem.putInt(0); // 4 bytes reserved - boolean appendRes = mappedFile.appendMessage(this.byteBufferItem.array()); + + boolean appendRes; + + if (messageStore.getMessageStoreConfig().isPutConsumeQueueDataByFileChannel()) { + appendRes = mappedFile.appendMessageUsingFileChannel(this.byteBufferItem.array()); + } else { + appendRes = mappedFile.appendMessage(this.byteBufferItem.array()); + } + if (!appendRes) { log.error("append end position info into {} failed", mappedFile.getFileName()); }