This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 46962c262c [ISSUE #7523] Message will flush timeout when 
transientStorePoolEnable=true and flushDiskType=SYNC_FLUSH (#7524)
46962c262c is described below

commit 46962c262c37554ff09afe9e02c7baf66a5ecc73
Author: fujian-zfj <2573259...@qq.com>
AuthorDate: Thu Nov 2 13:47:16 2023 +0800

    [ISSUE #7523] Message will flush timeout when transientStorePoolEnable=true 
and flushDiskType=SYNC_FLUSH (#7524)
    
    * typo int readme[ecosystem]
    
    * enableTransientPool and sync_flush will cause flush_time_out
    
    * polish
    
    * add log
---
 .../main/java/org/apache/rocketmq/store/CommitLog.java  | 17 +++++++++++++----
 1 file changed, 13 insertions(+), 4 deletions(-)

diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java 
b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index 3d3ee86b8f..6c3afde70f 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -1634,12 +1634,21 @@ public class CommitLog implements Swappable {
         private void doCommit() {
             if (!this.requestsRead.isEmpty()) {
                 for (GroupCommitRequest req : this.requestsRead) {
-                    // There may be a message in the next file, so a maximum of
-                    // two times the flush
                     boolean flushOK = 
CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
-                    for (int i = 0; i < 2 && !flushOK; i++) {
+                    for (int i = 0; i < 1000 && !flushOK; i++) {
                         CommitLog.this.mappedFileQueue.flush(0);
                         flushOK = 
CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
+                        if (flushOK) {
+                            break;
+                        } else {
+                            // When transientStorePoolEnable is true, the 
messages in writeBuffer may not be committed
+                            // to pageCache very quickly, and flushOk here may 
almost be false, so we can sleep 1ms to
+                            // wait for the messages to be committed to 
pageCache.
+                            try {
+                                Thread.sleep(1);
+                            } catch (InterruptedException ignored) {
+                            }
+                        }
                     }
 
                     req.wakeupCustomer(flushOK ? PutMessageStatus.PUT_OK : 
PutMessageStatus.FLUSH_DISK_TIMEOUT);
@@ -1846,7 +1855,7 @@ public class CommitLog implements Swappable {
             // Record ConsumeQueue information
             Long queueOffset = msgInner.getQueueOffset();
 
-            // this msg maybe a inner-batch msg.
+            // this msg maybe an inner-batch msg.
             short messageNum = getMessageNum(msgInner);
 
             // Transaction messages that require special handling

Reply via email to