This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 4ae9451b637 branch-2.1: [opt](connection) add connection num in error 
msg (#49471) (#49910)
4ae9451b637 is described below

commit 4ae9451b63744b938ae7d3959efbf310bba85803
Author: Mingyu Chen (Rayner) <morning...@163.com>
AuthorDate: Thu Apr 10 07:00:51 2025 -0700

    branch-2.1: [opt](connection) add connection num in error msg (#49471) 
(#49910)
    
    bp #49471
---
 .../java/org/apache/doris/mysql/AcceptListener.java  | 11 +++++++----
 .../java/org/apache/doris/qe/ConnectScheduler.java   | 14 ++++++++++----
 .../sessions/FlightSessionsWithTokenManager.java     | 20 ++++++++++++++------
 3 files changed, 31 insertions(+), 14 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/mysql/AcceptListener.java 
b/fe/fe-core/src/main/java/org/apache/doris/mysql/AcceptListener.java
index 3d783f28cb3..0388a532efd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mysql/AcceptListener.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/AcceptListener.java
@@ -89,15 +89,18 @@ public class AcceptListener implements 
ChannelListener<AcceptingChannel<StreamCo
                         if (!MysqlProto.negotiate(context)) {
                             throw new AfterConnectedException("mysql negotiate 
failed");
                         }
-                        if (connectScheduler.registerConnection(context)) {
+                        int res = connectScheduler.registerConnection(context);
+                        if (res == -1) {
                             MysqlProto.sendResponsePacket(context);
                             connection.setCloseListener(
                                     streamConnection -> 
connectScheduler.unregisterConnection(context));
                         } else {
-                            
context.getState().setError(ErrorCode.ERR_TOO_MANY_USER_CONNECTIONS,
-                                    "Reach limit of connections");
+                            long userConnLimit = 
context.getEnv().getAuth().getMaxConn(context.getQualifiedUser());
+                            String errMsg = String.format("Reach limit of 
connections. Total: %, User: %d, Current: %d",
+                                    connectScheduler.getMaxConnections(), 
userConnLimit, res);
+                            
context.getState().setError(ErrorCode.ERR_TOO_MANY_USER_CONNECTIONS, errMsg);
                             MysqlProto.sendResponsePacket(context);
-                            throw new AfterConnectedException("Reach limit of 
connections");
+                            throw new AfterConnectedException(errMsg);
                         }
                         context.setStartTime();
                         int userQueryTimeout = 
context.getEnv().getAuth().getQueryTimeout(context.getQualifiedUser());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java
index e4626a0d215..1618ca2dd43 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java
@@ -90,10 +90,12 @@ public class ConnectScheduler {
     }
 
     // Register one connection with its connection id.
-    public boolean registerConnection(ConnectContext ctx) {
+    // Return -1 means register OK
+    // Return >=0 means register failed, and return value is current 
connection num.
+    public int registerConnection(ConnectContext ctx) {
         if (numberConnection.incrementAndGet() > maxConnections) {
             numberConnection.decrementAndGet();
-            return false;
+            return numberConnection.get();
         }
         // Check user
         connByUser.putIfAbsent(ctx.getQualifiedUser(), new AtomicInteger(0));
@@ -101,13 +103,13 @@ public class ConnectScheduler {
         if (conns.incrementAndGet() > 
ctx.getEnv().getAuth().getMaxConn(ctx.getQualifiedUser())) {
             conns.decrementAndGet();
             numberConnection.decrementAndGet();
-            return false;
+            return numberConnection.get();
         }
         connectionMap.put(ctx.getConnectionId(), ctx);
         if (ctx.getConnectType().equals(ConnectType.ARROW_FLIGHT_SQL)) {
             flightToken2ConnectionId.put(ctx.getPeerIdentity(), 
ctx.getConnectionId());
         }
-        return true;
+        return -1;
     }
 
     public void unregisterConnection(ConnectContext ctx) {
@@ -202,4 +204,8 @@ public class ConnectScheduler {
     public Map<Integer, ConnectContext> getConnectionMap() {
         return connectionMap;
     }
+
+    public int getMaxConnections() {
+        return maxConnections;
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSessionsWithTokenManager.java
 
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSessionsWithTokenManager.java
index 75d7ff1b334..a495b02c393 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSessionsWithTokenManager.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSessionsWithTokenManager.java
@@ -20,6 +20,7 @@ package org.apache.doris.service.arrowflight.sessions;
 import org.apache.doris.common.ErrorCode;
 import org.apache.doris.common.util.Util;
 import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.ConnectScheduler;
 import org.apache.doris.service.ExecuteEnv;
 import org.apache.doris.service.arrowflight.tokens.FlightTokenDetails;
 import org.apache.doris.service.arrowflight.tokens.FlightTokenManager;
@@ -65,12 +66,19 @@ public class FlightSessionsWithTokenManager implements 
FlightSessionsManager {
         flightTokenDetails.setCreatedSession(true);
         ConnectContext connectContext = 
FlightSessionsManager.buildConnectContext(peerIdentity,
                 flightTokenDetails.getUserIdentity(), 
flightTokenDetails.getRemoteIp());
-        ExecuteEnv.getInstance().getScheduler().submit(connectContext);
-        if 
(!ExecuteEnv.getInstance().getScheduler().registerConnection(connectContext)) {
-            String err = "Reach limit of connections, increase 
`qe_max_connection` in fe.conf, or decrease "
-                    + "`arrow_flight_token_cache_size` to evict unused bearer 
tokens and it connections faster";
-            
connectContext.getState().setError(ErrorCode.ERR_TOO_MANY_USER_CONNECTIONS, 
err);
-            throw new IllegalArgumentException(err);
+        ConnectScheduler connectScheduler = 
ExecuteEnv.getInstance().getScheduler();
+        connectScheduler.submit(connectContext);
+        int res = connectScheduler.registerConnection(connectContext);
+        if (res >= 0) {
+            long userConnLimit = 
connectContext.getEnv().getAuth().getMaxConn(connectContext.getQualifiedUser());
+            String errMsg = String.format(
+                    "Reach limit of connections. Total: %d, User: %d, Current: 
%d. "
+                            + "Increase `qe_max_connection` in fe.conf or 
user's `max_user_connections`,"
+                            + " or decrease `arrow_flight_token_cache_size` "
+                            + "to evict unused bearer tokens and it 
connections faster",
+                    connectScheduler.getMaxConnections(), userConnLimit, res);
+            
connectContext.getState().setError(ErrorCode.ERR_TOO_MANY_USER_CONNECTIONS, 
errMsg);
+            throw new IllegalArgumentException(errMsg);
         }
         return connectContext;
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to