This is an automated email from the ASF dual-hosted git repository.

caigy pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 86d59d2485 Add the ability to write ConsumeQueue using fileChannel to 
prevent JVM crashes in some situations (#8403)
86d59d2485 is described below

commit 86d59d2485b5fed162db3743e11c0902de3e34ad
Author: rongtong <jinrongton...@mails.ucas.ac.cn>
AuthorDate: Mon Jul 22 17:20:11 2024 +0800

    Add the ability to write ConsumeQueue using fileChannel to prevent JVM 
crashes in some situations (#8403)
---
 .../org/apache/rocketmq/store/ConsumeQueue.java    | 15 ++++++++++--
 .../rocketmq/store/config/MessageStoreConfig.java  | 10 ++++++++
 .../rocketmq/store/logfile/DefaultMappedFile.java  | 27 ++++++++++++++++++----
 .../apache/rocketmq/store/logfile/MappedFile.java  | 11 +++++++++
 .../rocketmq/store/queue/BatchConsumeQueue.java    |  7 +++++-
 .../rocketmq/store/queue/SparseConsumeQueue.java   | 10 +++++++-
 6 files changed, 71 insertions(+), 9 deletions(-)

diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java 
b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
index 569cc3cfaa..eb8af4ab19 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
@@ -833,7 +833,13 @@ public class ConsumeQueue implements 
ConsumeQueueInterface, FileQueueLifeCycle {
                 }
             }
             this.setMaxPhysicOffset(offset + size);
-            return mappedFile.appendMessage(this.byteBufferIndex.array());
+            boolean appendResult;
+            if 
(messageStore.getMessageStoreConfig().isPutConsumeQueueDataByFileChannel()) {
+                appendResult = 
mappedFile.appendMessageUsingFileChannel(this.byteBufferIndex.array());
+            } else {
+                appendResult = 
mappedFile.appendMessage(this.byteBufferIndex.array());
+            }
+            return appendResult;
         }
         return false;
     }
@@ -846,7 +852,12 @@ public class ConsumeQueue implements 
ConsumeQueueInterface, FileQueueLifeCycle {
 
         int until = (int) (untilWhere % 
this.mappedFileQueue.getMappedFileSize());
         for (int i = 0; i < until; i += CQ_STORE_UNIT_SIZE) {
-            mappedFile.appendMessage(byteBuffer.array());
+            if 
(messageStore.getMessageStoreConfig().isPutConsumeQueueDataByFileChannel()) {
+                mappedFile.appendMessageUsingFileChannel(byteBuffer.array());
+            } else {
+                mappedFile.appendMessage(byteBuffer.array());
+            }
+
         }
     }
 
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java 
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index 0060b144cf..5b2a1931b3 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -419,6 +419,8 @@ public class MessageStoreConfig {
      */
     private boolean readUnCommitted = false;
 
+    private boolean putConsumeQueueDataByFileChannel = true;
+
     public boolean isEnabledAppendPropCRC() {
         return enabledAppendPropCRC;
     }
@@ -1832,4 +1834,12 @@ public class MessageStoreConfig {
     public void setReadUnCommitted(boolean readUnCommitted) {
         this.readUnCommitted = readUnCommitted;
     }
+
+    public boolean isPutConsumeQueueDataByFileChannel() {
+        return putConsumeQueueDataByFileChannel;
+    }
+
+    public void setPutConsumeQueueDataByFileChannel(boolean 
putConsumeQueueDataByFileChannel) {
+        this.putConsumeQueueDataByFileChannel = 
putConsumeQueueDataByFileChannel;
+    }
 }
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java 
b/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java
index 03477c3324..c490d093a1 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java
@@ -97,14 +97,14 @@ public class DefaultMappedFile extends AbstractMappedFile {
     protected long mappedByteBufferAccessCountSinceLastSwap = 0L;
 
     /**
-     * If this mapped file belongs to consume queue, this field stores 
store-timestamp of first message referenced
-     * by this logical queue.
+     * If this mapped file belongs to consume queue, this field stores 
store-timestamp of first message referenced by
+     * this logical queue.
      */
     private long startTimestamp = -1;
 
     /**
-     * If this mapped file belongs to consume queue, this field stores 
store-timestamp of last message referenced
-     * by this logical queue.
+     * If this mapped file belongs to consume queue, this field stores 
store-timestamp of last message referenced by
+     * this logical queue.
      */
     private long stopTimestamp = -1;
 
@@ -357,6 +357,24 @@ public class DefaultMappedFile extends AbstractMappedFile {
         return false;
     }
 
+    @Override
+    public boolean appendMessageUsingFileChannel(byte[] data) {
+        int currentPos = WROTE_POSITION_UPDATER.get(this);
+
+        if ((currentPos + data.length) <= this.fileSize) {
+            try {
+                this.fileChannel.position(currentPos);
+                this.fileChannel.write(ByteBuffer.wrap(data, 0, data.length));
+            } catch (Throwable e) {
+                log.error("Error occurred when append message to mappedFile.", 
e);
+            }
+            WROTE_POSITION_UPDATER.addAndGet(this, data.length);
+            return true;
+        }
+
+        return false;
+    }
+
     /**
      * @return The current flushed position
      */
@@ -840,7 +858,6 @@ public class DefaultMappedFile extends AbstractMappedFile {
         this.stopTimestamp = stopTimestamp;
     }
 
-
     public Iterator<SelectMappedBufferResult> iterator(int startPos) {
         return new Itr(startPos);
     }
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/logfile/MappedFile.java 
b/store/src/main/java/org/apache/rocketmq/store/logfile/MappedFile.java
index dfcf66f088..fd70d6c563 100644
--- a/store/src/main/java/org/apache/rocketmq/store/logfile/MappedFile.java
+++ b/store/src/main/java/org/apache/rocketmq/store/logfile/MappedFile.java
@@ -101,12 +101,23 @@ public interface MappedFile {
 
     /**
      * Appends a raw message data represents by a byte array to the current 
{@code MappedFile}.
+     * Using mappedByteBuffer
      *
      * @param data the byte array to append
      * @return true if success; false otherwise.
      */
     boolean appendMessage(byte[] data);
 
+
+    /**
+     * Appends a raw message data represents by a byte array to the current 
{@code MappedFile}.
+     * Using fileChannel
+     *
+     * @param data the byte array to append
+     * @return true if success; false otherwise.
+     */
+    boolean appendMessageUsingFileChannel(byte[] data);
+
     /**
      * Appends a raw message data represents by a byte array to the current 
{@code MappedFile}.
      *
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java 
b/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java
index 7108c835c8..1617182724 100644
--- a/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java
@@ -587,7 +587,12 @@ public class BatchConsumeQueue implements 
ConsumeQueueInterface {
         MappedFile mappedFile = 
this.mappedFileQueue.getLastMappedFile(this.mappedFileQueue.getMaxOffset());
         if (mappedFile != null) {
             boolean isNewFile = isNewFile(mappedFile);
-            boolean appendRes = 
mappedFile.appendMessage(this.byteBufferItem.array());
+            boolean appendRes;
+            if 
(messageStore.getMessageStoreConfig().isPutConsumeQueueDataByFileChannel()) {
+                appendRes = 
mappedFile.appendMessageUsingFileChannel(this.byteBufferItem.array());
+            } else {
+                appendRes = 
mappedFile.appendMessage(this.byteBufferItem.array());
+            }
             if (appendRes) {
                 maxMsgPhyOffsetInCommitLog = offset;
                 maxOffsetInQueue = msgBaseOffset + batchSize;
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/queue/SparseConsumeQueue.java 
b/store/src/main/java/org/apache/rocketmq/store/queue/SparseConsumeQueue.java
index 4a5f3a93b1..7e14de30ab 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/queue/SparseConsumeQueue.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/queue/SparseConsumeQueue.java
@@ -262,7 +262,15 @@ public class SparseConsumeQueue extends BatchConsumeQueue {
             this.byteBufferItem.putShort((short)0);
             this.byteBufferItem.putInt(INVALID_POS);
             this.byteBufferItem.putInt(0); // 4 bytes reserved
-            boolean appendRes = 
mappedFile.appendMessage(this.byteBufferItem.array());
+
+            boolean appendRes;
+
+            if 
(messageStore.getMessageStoreConfig().isPutConsumeQueueDataByFileChannel()) {
+                appendRes = 
mappedFile.appendMessageUsingFileChannel(this.byteBufferItem.array());
+            } else {
+                appendRes = 
mappedFile.appendMessage(this.byteBufferItem.array());
+            }
+
             if (!appendRes) {
                 log.error("append end position info into {} failed", 
mappedFile.getFileName());
             }

Reply via email to