This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new a79b56ac23d [chore](be) Support config max message size for be thrift 
server (#36595)
a79b56ac23d is described below

commit a79b56ac23d9003f4b32c1f5adb78de1a7758bf9
Author: walter <w41te...@gmail.com>
AuthorDate: Thu Jun 20 20:15:43 2024 +0800

    [chore](be) Support config max message size for be thrift server (#36595)
    
    Cherry-pick #36467
---
 be/src/common/config.cpp           |  3 ++
 be/src/common/config.h             |  3 ++
 be/src/runtime/snapshot_loader.cpp |  2 +-
 be/src/util/thrift_server.cpp      | 67 +++++++++++++++++++++++++-------------
 4 files changed, 52 insertions(+), 23 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index a8abee2f4cd..ba173b0d03f 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -248,6 +248,9 @@ DEFINE_mInt32(thrift_connect_timeout_seconds, "3");
 DEFINE_mInt32(fetch_rpc_timeout_seconds, "30");
 // default thrift client retry interval (in milliseconds)
 DEFINE_mInt64(thrift_client_retry_interval_ms, "1000");
+// max message size of thrift request
+// default: 100 * 1024 * 1024
+DEFINE_mInt64(thrift_max_message_size, "104857600");
 // max row count number for single scan range, used in segmentv1
 DEFINE_mInt32(doris_scan_range_row_count, "524288");
 // max bytes number for single scan range, used in segmentv2
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 865d23000f5..5c60ffae258 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -294,6 +294,9 @@ DECLARE_mInt32(thrift_connect_timeout_seconds);
 DECLARE_mInt32(fetch_rpc_timeout_seconds);
 // default thrift client retry interval (in milliseconds)
 DECLARE_mInt64(thrift_client_retry_interval_ms);
+// max message size of thrift request
+// default: 100 * 1024 * 1024
+DECLARE_mInt64(thrift_max_message_size);
 // max row count number for single scan range, used in segmentv1
 DECLARE_mInt32(doris_scan_range_row_count);
 // max bytes number for single scan range, used in segmentv2
diff --git a/be/src/runtime/snapshot_loader.cpp 
b/be/src/runtime/snapshot_loader.cpp
index f064bd798f7..cab8edb1927 100644
--- a/be/src/runtime/snapshot_loader.cpp
+++ b/be/src/runtime/snapshot_loader.cpp
@@ -117,7 +117,7 @@ Status SnapshotLoader::init(TStorageBackendType::type type, 
const std::string& l
         RETURN_IF_ERROR(io::BrokerFileSystem::create(_broker_addr, _prop, 
&fs));
         _remote_fs = std::move(fs);
     } else {
-        return Status::InternalError("Unknown storage tpye: {}", type);
+        return Status::InternalError("Unknown storage type: {}", type);
     }
     return Status::OK();
 }
diff --git a/be/src/util/thrift_server.cpp b/be/src/util/thrift_server.cpp
index 06e59963130..7844f7daa1e 100644
--- a/be/src/util/thrift_server.cpp
+++ b/be/src/util/thrift_server.cpp
@@ -34,10 +34,12 @@
 // IWYU pragma: no_include <bits/chrono.h>
 #include <chrono> // IWYU pragma: keep
 #include <condition_variable>
+#include <memory>
 #include <mutex>
 #include <sstream>
 #include <thread>
 
+#include "common/config.h"
 #include "service/backend_options.h"
 #include "util/doris_metrics.h"
 
@@ -58,6 +60,28 @@ 
DEFINE_GAUGE_METRIC_PROTOTYPE_3ARG(thrift_current_connections, MetricUnit::CONNE
 DEFINE_COUNTER_METRIC_PROTOTYPE_3ARG(thrift_connections_total, 
MetricUnit::CONNECTIONS,
                                      "Total connections made over the lifetime 
of this server");
 
+// Nonblocking Server socket implementation of TNonblockingServerTransport. 
Wrapper around a unix
+// socket listen and accept calls.
+class ImprovedNonblockingServerSocket : public 
apache::thrift::transport::TNonblockingServerSocket {
+    using TConfiguration = apache::thrift::TConfiguration;
+    using TSocket = apache::thrift::transport::TSocket;
+
+public:
+    // Constructor.
+    ImprovedNonblockingServerSocket(int port)
+            : TNonblockingServerSocket(port),
+              
config(std::make_shared<TConfiguration>(config::thrift_max_message_size)) {}
+    ~ImprovedNonblockingServerSocket() override = default;
+
+protected:
+    std::shared_ptr<TSocket> createSocket(THRIFT_SOCKET clientSocket) override 
{
+        return std::make_shared<TSocket>(clientSocket, config);
+    }
+
+private:
+    std::shared_ptr<TConfiguration> config;
+};
+
 // Helper class that starts a server in a separate thread, and handles
 // the inter-thread communication to monitor whether it started
 // correctly.
@@ -68,26 +92,26 @@ public:
             : _thrift_server(thrift_server), _signal_fired(false) {}
 
     // friendly to code style
-    virtual ~ThriftServerEventProcessor() {}
+    ~ThriftServerEventProcessor() override = default;
 
     // Called by TNonBlockingServer when server has acquired its resources and 
is ready to
     // serve, and signals to StartAndWaitForServer that start-up is finished.
     // From TServerEventHandler.
-    virtual void preServe();
+    void preServe() override;
 
     // Called when a client connects; we create per-client state and call any
     // SessionHandlerIf handler.
-    virtual void* 
createContext(std::shared_ptr<apache::thrift::protocol::TProtocol> input,
-                                
std::shared_ptr<apache::thrift::protocol::TProtocol> output);
+    void* createContext(std::shared_ptr<apache::thrift::protocol::TProtocol> 
input,
+                        std::shared_ptr<apache::thrift::protocol::TProtocol> 
output) override;
 
     // Called when a client starts an RPC; we set the thread-local session key.
-    virtual void processContext(void* context,
-                                
std::shared_ptr<apache::thrift::transport::TTransport> output);
+    void processContext(void* context,
+                        std::shared_ptr<apache::thrift::transport::TTransport> 
output) override;
 
     // Called when a client disconnects; we call any SessionHandlerIf handler.
-    virtual void deleteContext(void* serverContext,
-                               
std::shared_ptr<apache::thrift::protocol::TProtocol> input,
-                               
std::shared_ptr<apache::thrift::protocol::TProtocol> output);
+    void deleteContext(void* serverContext,
+                       std::shared_ptr<apache::thrift::protocol::TProtocol> 
input,
+                       std::shared_ptr<apache::thrift::protocol::TProtocol> 
output) override;
 
     // Waits for a timeout of TIMEOUT_MS for a server to signal that it has 
started
     // correctly.
@@ -125,8 +149,8 @@ Status 
ThriftServer::ThriftServerEventProcessor::start_and_wait_for_server() {
     std::unique_lock<std::mutex> lock(_signal_lock);
     _thrift_server->_started = false;
 
-    _thrift_server->_server_thread.reset(
-            new 
std::thread(&ThriftServer::ThriftServerEventProcessor::supervise, this));
+    _thrift_server->_server_thread = std::make_unique<std::thread>(
+            &ThriftServer::ThriftServerEventProcessor::supervise, this);
 
     // Loop protects against spurious wakeup. Locks provide necessary fences 
to ensure
     // visibility.
@@ -314,7 +338,7 @@ Status ThriftServer::start() {
     std::shared_ptr<apache::thrift::transport::TServerTransport> 
fe_server_transport;
     std::shared_ptr<apache::thrift::transport::TTransportFactory> 
transport_factory;
     std::shared_ptr<apache::thrift::transport::TNonblockingServerSocket> 
socket =
-            
std::make_shared<apache::thrift::transport::TNonblockingServerSocket>(_port);
+            std::make_shared<ImprovedNonblockingServerSocket>(_port);
     if (_server_type != THREADED) {
         thread_mgr = 
apache::thrift::concurrency::ThreadManager::newSimpleThreadManager(
                 _num_worker_threads);
@@ -328,40 +352,39 @@ Status ThriftServer::start() {
 
     switch (_server_type) {
     case NON_BLOCKING:
-        if (transport_factory.get() == nullptr) {
+        if (transport_factory == nullptr) {
             transport_factory.reset(new 
apache::thrift::transport::TTransportFactory());
         }
 
-        _server.reset(new apache::thrift::server::TNonblockingServer(
+        _server = std::make_unique<apache::thrift::server::TNonblockingServer>(
                 _processor, transport_factory, transport_factory, 
protocol_factory,
-                protocol_factory, socket, thread_mgr));
+                protocol_factory, socket, thread_mgr);
         break;
 
     case THREAD_POOL:
         fe_server_transport.reset(new apache::thrift::transport::TServerSocket(
                 BackendOptions::get_service_bind_address_without_bracket(), 
_port));
 
-        if (transport_factory.get() == nullptr) {
+        if (transport_factory == nullptr) {
             transport_factory.reset(new 
apache::thrift::transport::TBufferedTransportFactory());
         }
 
-        _server.reset(new apache::thrift::server::TThreadPoolServer(
-                _processor, fe_server_transport, transport_factory, 
protocol_factory, thread_mgr));
+        _server = std::make_unique<apache::thrift::server::TThreadPoolServer>(
+                _processor, fe_server_transport, transport_factory, 
protocol_factory, thread_mgr);
         break;
 
     case THREADED:
         server_socket = new apache::thrift::transport::TServerSocket(
                 BackendOptions::get_service_bind_address_without_bracket(), 
_port);
-        //      server_socket->setAcceptTimeout(500);
         fe_server_transport.reset(server_socket);
 
-        if (transport_factory.get() == nullptr) {
+        if (transport_factory == nullptr) {
             transport_factory.reset(new 
apache::thrift::transport::TBufferedTransportFactory());
         }
 
-        _server.reset(new apache::thrift::server::TThreadedServer(
+        _server = std::make_unique<apache::thrift::server::TThreadedServer>(
                 _processor, fe_server_transport, transport_factory, 
protocol_factory,
-                thread_factory));
+                thread_factory);
         break;
 
     default:


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to