This is an automated email from the ASF dual-hosted git repository.

eldenmoon pushed a commit to branch timeout-future
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 9f4ce253bbdd724420b3c52ebbce4d6e55c9d707
Author: eldenmoon <15605149...@163.com>
AuthorDate: Sat Jul 8 22:01:21 2023 +0800

    add config to modify numof BackendServiceProxy since under high concurrent 
workd load BRPC channel will be blocked frenquently
---
 fe/fe-common/src/main/java/org/apache/doris/common/Config.java |  4 ++++
 .../src/main/java/org/apache/doris/qe/PointQueryExec.java      | 10 +++++-----
 .../main/java/org/apache/doris/rpc/BackendServiceProxy.java    |  2 +-
 3 files changed, 10 insertions(+), 6 deletions(-)

diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index dac63e6cd4..472a49454e 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -399,6 +399,10 @@ public class Config extends ConfigBase {
     @ConfField(description = {"MySQL 服务的最大任务线程数", "The max number of task 
threads in MySQL service"})
     public static int max_mysql_service_task_threads_num = 4096;
 
+    @ConfField(description = {"BackendServiceProxy数量, 用于池化GRPC channel",
+            "BackendServiceProxy pool size for pooling GRPC channels."})
+    public static int backend_proxy_num = 48;
+
     @ConfField(description = {
             "集群 ID,用于内部认证。通常在集群第一次启动时,会随机生成一个 cluster id. 用户也可以手动指定。",
             "Cluster id used for internal authentication. Usually a random 
integer generated when master FE "
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java
index e46c858c31..b861f129ad 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java
@@ -185,7 +185,7 @@ public class PointQueryExec {
             while (pResult == null) {
                 InternalService.PTabletKeyLookupRequest request = 
requestBuilder.build();
                 Future<InternalService.PTabletKeyLookupResponse> 
futureResponse =
-                        
BackendServiceProxy.getInstance().fetchTabletDataAsync(backend.getBrpcAdress(), 
request);
+                         
BackendServiceProxy.getInstance().fetchTabletDataAsync(backend.getBrpcAdress(), 
request);
                 long currentTs = System.currentTimeMillis();
                 if (currentTs >= timeoutTs) {
                     LOG.warn("fetch result timeout {}", 
backend.getBrpcAdress());
@@ -193,7 +193,7 @@ public class PointQueryExec {
                     return null;
                 }
                 try {
-                    pResult = futureResponse.get(timeoutTs - currentTs, 
TimeUnit.MILLISECONDS);
+                    pResult = futureResponse.get((timeoutTs - currentTs) / 3, 
TimeUnit.MILLISECONDS);
                 } catch (InterruptedException e) {
                     // continue to get result
                     LOG.info("future get interrupted Exception");
@@ -203,18 +203,18 @@ public class PointQueryExec {
                     }
                 } catch (TimeoutException e) {
                     futureResponse.cancel(true);
-                    LOG.warn("fetch result timeout {}", 
backend.getBrpcAdress());
+                    LOG.warn("fetch result timeout {}, addr {}", timeoutTs - 
currentTs, backend.getBrpcAdress());
                     status.setStatus("query timeout");
                     return null;
                 }
             }
         } catch (RpcException e) {
-            LOG.warn("fetch result rpc exception {}", backend.getBrpcAdress());
+            LOG.warn("fetch result rpc exception {}, e {}", 
backend.getBrpcAdress(), e);
             status.setRpcStatus(e.getMessage());
             SimpleScheduler.addToBlacklist(backend.getId(), e.getMessage());
             return null;
         } catch (ExecutionException e) {
-            LOG.warn("fetch result execution exception {}", 
backend.getBrpcAdress());
+            LOG.warn("fetch result execution exception {}, addr {}", e, 
backend.getBrpcAdress());
             if (e.getMessage().contains("time out")) {
                 // if timeout, we set error code to TIMEOUT, and it will not 
retry querying.
                 status.setStatus(new Status(TStatusCode.TIMEOUT, 
e.getMessage()));
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java 
b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
index c7c6a144c6..39dfd7915f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
@@ -61,7 +61,7 @@ public class BackendServiceProxy {
     }
 
     private static class Holder {
-        private static final int PROXY_NUM = 20;
+        private static final int PROXY_NUM = Config.backend_proxy_num;
         private static BackendServiceProxy[] proxies = new 
BackendServiceProxy[PROXY_NUM];
         private static AtomicInteger count = new AtomicInteger();
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to