This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 58cc1dca7f5 [improve](fe) Support to config max msg/frame size of the thrift server (#36594) 58cc1dca7f5 is described below commit 58cc1dca7f562c20536b191331ef2c38afe8debf Author: walter <w41te...@gmail.com> AuthorDate: Fri Jun 21 00:15:15 2024 +0800 [improve](fe) Support to config max msg/frame size of the thrift server (#36594) Cherry-pick #35845 --- .../main/java/org/apache/doris/common/Config.java | 10 ++++ .../java/org/apache/doris/common/ThriftServer.java | 61 ++++++++++++++++++---- 2 files changed, 60 insertions(+), 11 deletions(-) diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index f8ff3cd5d47..6adf03c56cd 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -405,6 +405,16 @@ public class Config extends ConfigBase { "The connection timeout of thrift client, in milliseconds. 0 means no timeout."}) public static int thrift_client_timeout_ms = 0; + // The default value is inherited from org.apache.thrift.TConfiguration + @ConfField(description = {"thrift server 接收请求大小的上限", + "The maximum size of a (received) message of the thrift server, in bytes"}) + public static int thrift_max_message_size = 100 * 1024 * 1024; + + // The default value is inherited from org.apache.thrift.TConfiguration + @ConfField(description = {"thrift server transport 接收的每帧数据大小的上限", + "The limits of the size of one frame of thrift server transport"}) + public static int thrift_max_frame_size = 16384000; + @ConfField(description = {"thrift server 的 backlog 数量。" + "如果调大这个值,则需同时调整 /proc/sys/net/core/somaxconn 的值", "The backlog number of thrift server. " diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/ThriftServer.java b/fe/fe-core/src/main/java/org/apache/doris/common/ThriftServer.java index 2396dc95074..f18dbb378a1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/ThriftServer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/ThriftServer.java @@ -23,6 +23,7 @@ import org.apache.doris.thrift.TNetworkAddress; import com.google.common.collect.Sets; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.thrift.TConfiguration; import org.apache.thrift.TProcessor; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.server.TServer; @@ -31,10 +32,13 @@ import org.apache.thrift.server.TThreadPoolServer; import org.apache.thrift.server.TThreadedSelectorServer; import org.apache.thrift.transport.TNonblockingServerSocket; import org.apache.thrift.transport.TServerSocket; +import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransportException; import java.io.IOException; import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.Socket; import java.util.Set; import java.util.concurrent.ThreadPoolExecutor; @@ -98,8 +102,9 @@ public class ThriftServer { private void createThreadedServer() throws TTransportException { TThreadedSelectorServer.Args args = new TThreadedSelectorServer.Args( - new TNonblockingServerSocket(port, Config.thrift_client_timeout_ms)).protocolFactory( - new TBinaryProtocol.Factory()).processor(processor); + new TNonblockingServerSocket(port, Config.thrift_client_timeout_ms)) + .protocolFactory(new TBinaryProtocol.Factory()) + .processor(processor); ThreadPoolExecutor threadPoolExecutor = ThreadPoolManager.newDaemonCacheThreadPool( Config.thrift_server_max_worker_threads, "thrift-server-pool", true); args.executorService(threadPoolExecutor); @@ -111,19 +116,19 @@ public class ThriftServer { if (FrontendOptions.isBindIPV6()) { socketTransportArgs = new TServerSocket.ServerSocketTransportArgs() - .bindAddr(new InetSocketAddress("::0", port)) - .clientTimeout(Config.thrift_client_timeout_ms) - .backlog(Config.thrift_backlog_num); + .bindAddr(new InetSocketAddress("::0", port)) + .clientTimeout(Config.thrift_client_timeout_ms) + .backlog(Config.thrift_backlog_num); } else { socketTransportArgs = new TServerSocket.ServerSocketTransportArgs() - .bindAddr(new InetSocketAddress("0.0.0.0", port)) - .clientTimeout(Config.thrift_client_timeout_ms) - .backlog(Config.thrift_backlog_num); + .bindAddr(new InetSocketAddress("0.0.0.0", port)) + .clientTimeout(Config.thrift_client_timeout_ms) + .backlog(Config.thrift_backlog_num); } - TThreadPoolServer.Args serverArgs = - new TThreadPoolServer.Args(new TServerSocket(socketTransportArgs)).protocolFactory( - new TBinaryProtocol.Factory()).processor(processor); + TThreadPoolServer.Args serverArgs = new TThreadPoolServer.Args(new ImprovedTServerSocket(socketTransportArgs)) + .protocolFactory(new TBinaryProtocol.Factory()) + .processor(processor); ThreadPoolExecutor threadPoolExecutor = ThreadPoolManager.newDaemonCacheThreadPool( Config.thrift_server_max_worker_threads, "thrift-server-pool", true); serverArgs.executorService(threadPoolExecutor); @@ -175,4 +180,38 @@ public class ThriftServer { public void removeConnect(TNetworkAddress clientAddress) { connects.remove(clientAddress); } + + static class ImprovedTServerSocket extends TServerSocket { + public ImprovedTServerSocket(ServerSocketTransportArgs args) throws TTransportException { + super(args); + } + + public TSocket accept() throws TTransportException { + ServerSocket serverSocket = getServerSocket(); + if (serverSocket == null) { + throw new TTransportException(TTransportException.NOT_OPEN, "No underlying server socket."); + } + + Socket result; + try { + result = serverSocket.accept(); + } catch (Exception e) { + throw new TTransportException(e); + } + if (result == null) { + throw new TTransportException("Blocking server's accept() may not return NULL"); + } + + TSocket socket = new TSocket(result); + + TConfiguration cfg = socket.getConfiguration(); + cfg.setMaxMessageSize(Config.thrift_max_message_size); + cfg.setMaxFrameSize(Config.thrift_max_frame_size); + + socket.updateKnownMessageSize(0); // Since we update the configuration, reset consumed message size. + socket.setTimeout(Config.thrift_client_timeout_ms); + + return socket; + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org