This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new a1bf49d5d0 [ISSUE #7093] Avoid dispatch tasks too much cause dispatch 
task failed (#7094)
a1bf49d5d0 is described below

commit a1bf49d5d07cf64374bc3dde5ab43add831433ad
Author: lizhimins <[email protected]>
AuthorDate: Tue Aug 1 15:56:34 2023 +0800

    [ISSUE #7093] Avoid dispatch tasks too much cause dispatch task failed 
(#7094)
    
    * Avoid dispatch tasks too much cause dispatch task failed
    
    * set schedule task async
---
 .../org/apache/rocketmq/tieredstore/TieredDispatcher.java     | 11 ++++++-----
 .../rocketmq/tieredstore/common/TieredStoreExecutor.java      |  2 +-
 2 files changed, 7 insertions(+), 6 deletions(-)

diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
index 523b0c2cde..bb58ea7dd5 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
@@ -82,7 +82,7 @@ public class TieredDispatcher extends ServiceThread 
implements CommitLogDispatch
         TieredStoreExecutor.commonScheduledExecutor.scheduleWithFixedDelay(() 
->
             tieredFlatFileManager.deepCopyFlatFileToList().forEach(flatFile -> 
{
                 if (!flatFile.getCompositeFlatFileLock().isLocked()) {
-                    dispatchFlatFile(flatFile);
+                    dispatchFlatFileAsync(flatFile);
                 }
             }), 30, 10, TimeUnit.SECONDS);
     }
@@ -180,10 +180,6 @@ public class TieredDispatcher extends ServiceThread 
implements CommitLogDispatch
                 message.release();
                 flatFile.getCompositeFlatFileLock().unlock();
             }
-        } else {
-            if (!flatFile.getCompositeFlatFileLock().isLocked()) {
-                this.dispatchFlatFileAsync(flatFile);
-            }
         }
     }
 
@@ -199,6 +195,11 @@ public class TieredDispatcher extends ServiceThread 
implements CommitLogDispatch
     }
 
     public void dispatchFlatFileAsync(CompositeQueueFlatFile flatFile, 
Consumer<Long> consumer) {
+        // Avoid dispatch tasks too much
+        if (TieredStoreExecutor.dispatchThreadPoolQueue.size() >
+            TieredStoreExecutor.QUEUE_CAPACITY * 0.75) {
+            return;
+        }
         TieredStoreExecutor.dispatchExecutor.execute(() -> {
             try {
                 dispatchFlatFile(flatFile);
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredStoreExecutor.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredStoreExecutor.java
index 23f1b01eac..6eb3478b3d 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredStoreExecutor.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredStoreExecutor.java
@@ -27,7 +27,7 @@ import org.apache.rocketmq.common.ThreadFactoryImpl;
 
 public class TieredStoreExecutor {
 
-    private static final int QUEUE_CAPACITY = 10000;
+    public static final int QUEUE_CAPACITY = 10000;
 
     // Visible for monitor
     public static BlockingQueue<Runnable> dispatchThreadPoolQueue;

Reply via email to