This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new a9939c09c15 branch-2.1: [improve](thrift) Config
thrift_max_message_size for FE SIMPLE and TH… #49678 (#49725)
a9939c09c15 is described below
commit a9939c09c1584254a6d1bf887df45b21037aee9f
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Apr 1 17:03:48 2025 +0800
branch-2.1: [improve](thrift) Config thrift_max_message_size for FE SIMPLE
and TH… #49678 (#49725)
Cherry-picked from #49678
Co-authored-by: walter <[email protected]>
---
.../java/org/apache/doris/common/ThriftServer.java | 51 +++++++++++++++++++++-
1 file changed, 49 insertions(+), 2 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/ThriftServer.java
b/fe/fe-core/src/main/java/org/apache/doris/common/ThriftServer.java
index f18dbb378a1..cdc4bba71d3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/ThriftServer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/ThriftServer.java
@@ -31,6 +31,7 @@ import org.apache.thrift.server.TSimpleServer;
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.server.TThreadedSelectorServer;
import org.apache.thrift.transport.TNonblockingServerSocket;
+import org.apache.thrift.transport.TNonblockingSocket;
import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransportException;
@@ -95,14 +96,42 @@ public class ThriftServer {
}
private void createSimpleServer() throws TTransportException {
- TServer.Args args = new TServer.Args(new
TServerSocket(port)).protocolFactory(
+ TServerSocket.ServerSocketTransportArgs socketTransportArgs;
+
+ if (FrontendOptions.isBindIPV6()) {
+ socketTransportArgs = new TServerSocket.ServerSocketTransportArgs()
+ .bindAddr(new InetSocketAddress("::0", port))
+ .clientTimeout(Config.thrift_client_timeout_ms)
+ .backlog(Config.thrift_backlog_num);
+ } else {
+ socketTransportArgs = new TServerSocket.ServerSocketTransportArgs()
+ .bindAddr(new InetSocketAddress("0.0.0.0", port))
+ .clientTimeout(Config.thrift_client_timeout_ms)
+ .backlog(Config.thrift_backlog_num);
+ }
+
+ TServer.Args args = new TServer.Args(new
ImprovedTServerSocket(socketTransportArgs)).protocolFactory(
new TBinaryProtocol.Factory()).processor(processor);
server = new TSimpleServer(args);
}
private void createThreadedServer() throws TTransportException {
+ TNonblockingServerSocket.NonblockingAbstractServerSocketArgs
socketTransportArgs;
+
+ if (FrontendOptions.isBindIPV6()) {
+ socketTransportArgs = new
TNonblockingServerSocket.NonblockingAbstractServerSocketArgs()
+ .bindAddr(new InetSocketAddress("::0", port))
+ .clientTimeout(Config.thrift_client_timeout_ms)
+ .backlog(Config.thrift_backlog_num);
+ } else {
+ socketTransportArgs = new
TNonblockingServerSocket.NonblockingAbstractServerSocketArgs()
+ .bindAddr(new InetSocketAddress("0.0.0.0", port))
+ .clientTimeout(Config.thrift_client_timeout_ms)
+ .backlog(Config.thrift_backlog_num);
+ }
+
TThreadedSelectorServer.Args args = new TThreadedSelectorServer.Args(
- new TNonblockingServerSocket(port,
Config.thrift_client_timeout_ms))
+ new ImprovedTNonblockingServerSocket(socketTransportArgs))
.protocolFactory(new TBinaryProtocol.Factory())
.processor(processor);
ThreadPoolExecutor threadPoolExecutor =
ThreadPoolManager.newDaemonCacheThreadPool(
@@ -214,4 +243,22 @@ public class ThriftServer {
return socket;
}
}
+
+ static class ImprovedTNonblockingServerSocket extends
TNonblockingServerSocket {
+ public
ImprovedTNonblockingServerSocket(NonblockingAbstractServerSocketArgs args)
throws TTransportException {
+ super(args);
+ }
+
+ @Override
+ public TNonblockingSocket accept() throws TTransportException {
+ TNonblockingSocket socket = super.accept();
+
+ TConfiguration cfg = socket.getConfiguration();
+ cfg.setMaxMessageSize(Config.thrift_max_message_size);
+ cfg.setMaxFrameSize(Config.thrift_max_frame_size);
+
+ socket.updateKnownMessageSize(0); // Since we update the
configuration, reset consumed message size.
+ return socket;
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]