This is an automated email from the ASF dual-hosted git repository. eldenmoon pushed a commit to branch timeout-future in repository https://gitbox.apache.org/repos/asf/doris.git
commit 9f4ce253bbdd724420b3c52ebbce4d6e55c9d707 Author: eldenmoon <15605149...@163.com> AuthorDate: Sat Jul 8 22:01:21 2023 +0800 add config to modify numof BackendServiceProxy since under high concurrent workd load BRPC channel will be blocked frenquently --- fe/fe-common/src/main/java/org/apache/doris/common/Config.java | 4 ++++ .../src/main/java/org/apache/doris/qe/PointQueryExec.java | 10 +++++----- .../main/java/org/apache/doris/rpc/BackendServiceProxy.java | 2 +- 3 files changed, 10 insertions(+), 6 deletions(-) diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index dac63e6cd4..472a49454e 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -399,6 +399,10 @@ public class Config extends ConfigBase { @ConfField(description = {"MySQL 服务的最大任务线程数", "The max number of task threads in MySQL service"}) public static int max_mysql_service_task_threads_num = 4096; + @ConfField(description = {"BackendServiceProxy数量, 用于池化GRPC channel", + "BackendServiceProxy pool size for pooling GRPC channels."}) + public static int backend_proxy_num = 48; + @ConfField(description = { "集群 ID,用于内部认证。通常在集群第一次启动时,会随机生成一个 cluster id. 用户也可以手动指定。", "Cluster id used for internal authentication. Usually a random integer generated when master FE " diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java b/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java index e46c858c31..b861f129ad 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java @@ -185,7 +185,7 @@ public class PointQueryExec { while (pResult == null) { InternalService.PTabletKeyLookupRequest request = requestBuilder.build(); Future<InternalService.PTabletKeyLookupResponse> futureResponse = - BackendServiceProxy.getInstance().fetchTabletDataAsync(backend.getBrpcAdress(), request); + BackendServiceProxy.getInstance().fetchTabletDataAsync(backend.getBrpcAdress(), request); long currentTs = System.currentTimeMillis(); if (currentTs >= timeoutTs) { LOG.warn("fetch result timeout {}", backend.getBrpcAdress()); @@ -193,7 +193,7 @@ public class PointQueryExec { return null; } try { - pResult = futureResponse.get(timeoutTs - currentTs, TimeUnit.MILLISECONDS); + pResult = futureResponse.get((timeoutTs - currentTs) / 3, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { // continue to get result LOG.info("future get interrupted Exception"); @@ -203,18 +203,18 @@ public class PointQueryExec { } } catch (TimeoutException e) { futureResponse.cancel(true); - LOG.warn("fetch result timeout {}", backend.getBrpcAdress()); + LOG.warn("fetch result timeout {}, addr {}", timeoutTs - currentTs, backend.getBrpcAdress()); status.setStatus("query timeout"); return null; } } } catch (RpcException e) { - LOG.warn("fetch result rpc exception {}", backend.getBrpcAdress()); + LOG.warn("fetch result rpc exception {}, e {}", backend.getBrpcAdress(), e); status.setRpcStatus(e.getMessage()); SimpleScheduler.addToBlacklist(backend.getId(), e.getMessage()); return null; } catch (ExecutionException e) { - LOG.warn("fetch result execution exception {}", backend.getBrpcAdress()); + LOG.warn("fetch result execution exception {}, addr {}", e, backend.getBrpcAdress()); if (e.getMessage().contains("time out")) { // if timeout, we set error code to TIMEOUT, and it will not retry querying. status.setStatus(new Status(TStatusCode.TIMEOUT, e.getMessage())); diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java index c7c6a144c6..39dfd7915f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java @@ -61,7 +61,7 @@ public class BackendServiceProxy { } private static class Holder { - private static final int PROXY_NUM = 20; + private static final int PROXY_NUM = Config.backend_proxy_num; private static BackendServiceProxy[] proxies = new BackendServiceProxy[PROXY_NUM]; private static AtomicInteger count = new AtomicInteger(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org