This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 7d7eb73e03 [ISSUE #10003] Add gRPC maxConcurrentCallsPerConnection 
Configuration to Proxy (#10004)
7d7eb73e03 is described below

commit 7d7eb73e035a7a172983396f018ce1726464528b
Author: qianye <[email protected]>
AuthorDate: Mon Mar 30 14:27:01 2026 +0800

    [ISSUE #10003] Add gRPC maxConcurrentCallsPerConnection Configuration to 
Proxy (#10004)
---
 .../rocketmq/common/thread/ThreadPoolMonitor.java    |  7 ++++---
 .../apache/rocketmq/proxy/config/ProxyConfig.java    | 19 +++++++++++++++++++
 .../rocketmq/proxy/grpc/GrpcServerBuilder.java       | 20 +++++++++++---------
 .../proxy/remoting/RemotingProtocolServer.java       | 13 ++++++-------
 proxy/src/main/resources/rmq.proxy.logback.xml       |  2 +-
 5 files changed, 41 insertions(+), 20 deletions(-)

diff --git 
a/common/src/main/java/org/apache/rocketmq/common/thread/ThreadPoolMonitor.java 
b/common/src/main/java/org/apache/rocketmq/common/thread/ThreadPoolMonitor.java
index a79674568b..02acd78ba1 100644
--- 
a/common/src/main/java/org/apache/rocketmq/common/thread/ThreadPoolMonitor.java
+++ 
b/common/src/main/java/org/apache/rocketmq/common/thread/ThreadPoolMonitor.java
@@ -142,9 +142,10 @@ public class ThreadPoolMonitor {
             List<ThreadPoolStatusMonitor> monitors = 
threadPoolWrapper.getStatusPrinters();
             for (ThreadPoolStatusMonitor monitor : monitors) {
                 double value = 
monitor.value(threadPoolWrapper.getThreadPoolExecutor());
-                String nameFormatted = String.format("%-40s", 
threadPoolWrapper.getName());
-                String descFormatted = String.format("%-12s", 
monitor.describe());
-                waterMarkLogger.info("{}{}{}", nameFormatted, descFormatted, 
value);
+                waterMarkLogger.info("\t{}\t{}\t{}", 
threadPoolWrapper.getName(),
+                    monitor.describe(),
+                    value);
+
                 if (enablePrintJstack) {
                     if 
(monitor.needPrintJstack(threadPoolWrapper.getThreadPoolExecutor(), value) &&
                             System.currentTimeMillis() - jstackTime > 
jstackPeriodTime) {
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java 
b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
index d44b82aff5..5a1a585930 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
@@ -95,6 +95,17 @@ public class ProxyConfig implements ConfigFile {
     private boolean enableGrpcEpoll = false;
     private int grpcThreadPoolNums = 16 + PROCESSOR_NUMBER * 2;
     private int grpcThreadPoolQueueCapacity = 100000;
+
+    /**
+     * Maximum number of concurrent gRPC calls allowed per client connection.
+     * <p>
+     * A single client issuing excessively high concurrent requests may skew 
the validation load balancing
+     * and overload a single proxy instance (hotspot), potentially bringing it 
down. Limiting
+     * {@code grpcMaxConcurrentCallsPerConnection} helps mitigate this 
per-connection hotspot risk.
+     * <p>
+     * Note: Setting this limit too low may cause send/consume failures (e.g., 
backpressure or rejected calls).
+     */
+    private int grpcMaxConcurrentCallsPerConnection = Integer.MAX_VALUE;
     private String brokerConfigPath = ConfigurationManager.getProxyHome() + 
"/conf/broker.conf";
     /**
      * gRPC max message size
@@ -1581,4 +1592,12 @@ public class ProxyConfig implements ConfigFile {
     public void setReturnHandleGroupThreadPoolNums(int 
returnHandleGroupThreadPoolNums) {
         this.returnHandleGroupThreadPoolNums = returnHandleGroupThreadPoolNums;
     }
+
+    public int getGrpcMaxConcurrentCallsPerConnection() {
+        return grpcMaxConcurrentCallsPerConnection;
+    }
+
+    public void setGrpcMaxConcurrentCallsPerConnection(int 
grpcMaxConcurrentCallsPerConnection) {
+        this.grpcMaxConcurrentCallsPerConnection = 
grpcMaxConcurrentCallsPerConnection;
+    }
 }
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java
index 163e799f41..1f012e6f40 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java
@@ -24,16 +24,16 @@ import 
io.grpc.netty.shaded.io.netty.channel.epoll.EpollEventLoopGroup;
 import io.grpc.netty.shaded.io.netty.channel.epoll.EpollServerSocketChannel;
 import io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoopGroup;
 import io.grpc.netty.shaded.io.netty.channel.socket.nio.NioServerSocketChannel;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.proxy.config.ConfigurationManager;
+import org.apache.rocketmq.proxy.config.ProxyConfig;
 import org.apache.rocketmq.proxy.grpc.interceptor.ContextInterceptor;
 import org.apache.rocketmq.proxy.grpc.interceptor.GlobalExceptionInterceptor;
 import org.apache.rocketmq.proxy.grpc.interceptor.HeaderInterceptor;
-
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.proxy.service.cert.TlsCertificateManager;
 
 public class GrpcServerBuilder {
@@ -52,18 +52,20 @@ public class GrpcServerBuilder {
     }
 
     protected GrpcServerBuilder(ThreadPoolExecutor executor, int port, 
TlsCertificateManager tlsCertificateManager) {
+        ProxyConfig config = ConfigurationManager.getProxyConfig();
         this.tlsCertificateManager = tlsCertificateManager;
-        serverBuilder = NettyServerBuilder.forPort(port);
+        serverBuilder = NettyServerBuilder.forPort(port)
+            
.maxConcurrentCallsPerConnection(config.getGrpcMaxConcurrentCallsPerConnection());
 
         serverBuilder.protocolNegotiator(new ProxyAndTlsProtocolNegotiator());
 
         // build server
-        int bossLoopNum = 
ConfigurationManager.getProxyConfig().getGrpcBossLoopNum();
-        int workerLoopNum = 
ConfigurationManager.getProxyConfig().getGrpcWorkerLoopNum();
-        int maxInboundMessageSize = 
ConfigurationManager.getProxyConfig().getGrpcMaxInboundMessageSize();
-        long idleTimeMills = 
ConfigurationManager.getProxyConfig().getGrpcClientIdleTimeMills();
+        int bossLoopNum = config.getGrpcBossLoopNum();
+        int workerLoopNum = config.getGrpcWorkerLoopNum();
+        int maxInboundMessageSize = config.getGrpcMaxInboundMessageSize();
+        long idleTimeMills = config.getGrpcClientIdleTimeMills();
 
-        if (ConfigurationManager.getProxyConfig().isEnableGrpcEpoll()) {
+        if (config.isEnableGrpcEpoll()) {
             serverBuilder.bossEventLoopGroup(new 
EpollEventLoopGroup(bossLoopNum))
                 .workerEventLoopGroup(new EpollEventLoopGroup(workerLoopNum))
                 .channelType(EpollServerSocketChannel.class)
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java
index a01c23fce6..c26f6bc2ef 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java
@@ -19,6 +19,11 @@ package org.apache.rocketmq.proxy.remoting;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import io.netty.channel.Channel;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.auth.config.AuthConfig;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.future.FutureTaskExt;
@@ -59,12 +64,6 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.remoting.protocol.RequestCode;
 import org.apache.rocketmq.remoting.protocol.ResponseCode;
 
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
 public class RemotingProtocolServer implements StartAndShutdown, 
RemotingProxyOutClient {
     private final static Logger log = 
LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
 
@@ -194,7 +193,7 @@ public class RemotingProtocolServer implements 
StartAndShutdown, RemotingProxyOu
         this.timerExecutor = ThreadUtils.newSingleThreadScheduledExecutor(
             new 
ThreadFactoryBuilder().setNameFormat("RemotingServerScheduler-%d").build()
         );
-        this.timerExecutor.scheduleAtFixedRate(this::cleanExpireRequest, 10, 
10, TimeUnit.SECONDS);
+        this.timerExecutor.scheduleAtFixedRate(this::cleanExpireRequest, 100, 
100, TimeUnit.MILLISECONDS);
 
         this.registerRemotingServer(this.defaultRemotingServer);
     }
diff --git a/proxy/src/main/resources/rmq.proxy.logback.xml 
b/proxy/src/main/resources/rmq.proxy.logback.xml
index aee4cbc71b..3eccf5f023 100644
--- a/proxy/src/main/resources/rmq.proxy.logback.xml
+++ b/proxy/src/main/resources/rmq.proxy.logback.xml
@@ -52,7 +52,7 @@
             <maxFileSize>128MB</maxFileSize>
         </triggeringPolicy>
         <encoder>
-            <pattern>%d{yyy-MM-dd HH:mm:ss,GMT+8} %m%n</pattern>
+            <pattern>%d{yyy-MM-dd HH:mm:ss,GMT+8}%m%n</pattern>
             <charset class="java.nio.charset.Charset">UTF-8</charset>
         </encoder>
     </appender>

Reply via email to