This is an automated email from the ASF dual-hosted git repository. jinrongtong pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new 46962c262c [ISSUE #7523] Message will flush timeout when transientStorePoolEnable=true and flushDiskType=SYNC_FLUSH (#7524) 46962c262c is described below commit 46962c262c37554ff09afe9e02c7baf66a5ecc73 Author: fujian-zfj <2573259...@qq.com> AuthorDate: Thu Nov 2 13:47:16 2023 +0800 [ISSUE #7523] Message will flush timeout when transientStorePoolEnable=true and flushDiskType=SYNC_FLUSH (#7524) * typo int readme[ecosystem] * enableTransientPool and sync_flush will cause flush_time_out * polish * add log --- .../main/java/org/apache/rocketmq/store/CommitLog.java | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java index 3d3ee86b8f..6c3afde70f 100644 --- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java @@ -1634,12 +1634,21 @@ public class CommitLog implements Swappable { private void doCommit() { if (!this.requestsRead.isEmpty()) { for (GroupCommitRequest req : this.requestsRead) { - // There may be a message in the next file, so a maximum of - // two times the flush boolean flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset(); - for (int i = 0; i < 2 && !flushOK; i++) { + for (int i = 0; i < 1000 && !flushOK; i++) { CommitLog.this.mappedFileQueue.flush(0); flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset(); + if (flushOK) { + break; + } else { + // When transientStorePoolEnable is true, the messages in writeBuffer may not be committed + // to pageCache very quickly, and flushOk here may almost be false, so we can sleep 1ms to + // wait for the messages to be committed to pageCache. + try { + Thread.sleep(1); + } catch (InterruptedException ignored) { + } + } } req.wakeupCustomer(flushOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_DISK_TIMEOUT); @@ -1846,7 +1855,7 @@ public class CommitLog implements Swappable { // Record ConsumeQueue information Long queueOffset = msgInner.getQueueOffset(); - // this msg maybe a inner-batch msg. + // this msg maybe an inner-batch msg. short messageNum = getMessageNum(msgInner); // Transaction messages that require special handling