This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new a79b56ac23d [chore](be) Support config max message size for be thrift server (#36595) a79b56ac23d is described below commit a79b56ac23d9003f4b32c1f5adb78de1a7758bf9 Author: walter <w41te...@gmail.com> AuthorDate: Thu Jun 20 20:15:43 2024 +0800 [chore](be) Support config max message size for be thrift server (#36595) Cherry-pick #36467 --- be/src/common/config.cpp | 3 ++ be/src/common/config.h | 3 ++ be/src/runtime/snapshot_loader.cpp | 2 +- be/src/util/thrift_server.cpp | 67 +++++++++++++++++++++++++------------- 4 files changed, 52 insertions(+), 23 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index a8abee2f4cd..ba173b0d03f 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -248,6 +248,9 @@ DEFINE_mInt32(thrift_connect_timeout_seconds, "3"); DEFINE_mInt32(fetch_rpc_timeout_seconds, "30"); // default thrift client retry interval (in milliseconds) DEFINE_mInt64(thrift_client_retry_interval_ms, "1000"); +// max message size of thrift request +// default: 100 * 1024 * 1024 +DEFINE_mInt64(thrift_max_message_size, "104857600"); // max row count number for single scan range, used in segmentv1 DEFINE_mInt32(doris_scan_range_row_count, "524288"); // max bytes number for single scan range, used in segmentv2 diff --git a/be/src/common/config.h b/be/src/common/config.h index 865d23000f5..5c60ffae258 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -294,6 +294,9 @@ DECLARE_mInt32(thrift_connect_timeout_seconds); DECLARE_mInt32(fetch_rpc_timeout_seconds); // default thrift client retry interval (in milliseconds) DECLARE_mInt64(thrift_client_retry_interval_ms); +// max message size of thrift request +// default: 100 * 1024 * 1024 +DECLARE_mInt64(thrift_max_message_size); // max row count number for single scan range, used in segmentv1 DECLARE_mInt32(doris_scan_range_row_count); // max bytes number for single scan range, used in segmentv2 diff --git a/be/src/runtime/snapshot_loader.cpp b/be/src/runtime/snapshot_loader.cpp index f064bd798f7..cab8edb1927 100644 --- a/be/src/runtime/snapshot_loader.cpp +++ b/be/src/runtime/snapshot_loader.cpp @@ -117,7 +117,7 @@ Status SnapshotLoader::init(TStorageBackendType::type type, const std::string& l RETURN_IF_ERROR(io::BrokerFileSystem::create(_broker_addr, _prop, &fs)); _remote_fs = std::move(fs); } else { - return Status::InternalError("Unknown storage tpye: {}", type); + return Status::InternalError("Unknown storage type: {}", type); } return Status::OK(); } diff --git a/be/src/util/thrift_server.cpp b/be/src/util/thrift_server.cpp index 06e59963130..7844f7daa1e 100644 --- a/be/src/util/thrift_server.cpp +++ b/be/src/util/thrift_server.cpp @@ -34,10 +34,12 @@ // IWYU pragma: no_include <bits/chrono.h> #include <chrono> // IWYU pragma: keep #include <condition_variable> +#include <memory> #include <mutex> #include <sstream> #include <thread> +#include "common/config.h" #include "service/backend_options.h" #include "util/doris_metrics.h" @@ -58,6 +60,28 @@ DEFINE_GAUGE_METRIC_PROTOTYPE_3ARG(thrift_current_connections, MetricUnit::CONNE DEFINE_COUNTER_METRIC_PROTOTYPE_3ARG(thrift_connections_total, MetricUnit::CONNECTIONS, "Total connections made over the lifetime of this server"); +// Nonblocking Server socket implementation of TNonblockingServerTransport. Wrapper around a unix +// socket listen and accept calls. +class ImprovedNonblockingServerSocket : public apache::thrift::transport::TNonblockingServerSocket { + using TConfiguration = apache::thrift::TConfiguration; + using TSocket = apache::thrift::transport::TSocket; + +public: + // Constructor. + ImprovedNonblockingServerSocket(int port) + : TNonblockingServerSocket(port), + config(std::make_shared<TConfiguration>(config::thrift_max_message_size)) {} + ~ImprovedNonblockingServerSocket() override = default; + +protected: + std::shared_ptr<TSocket> createSocket(THRIFT_SOCKET clientSocket) override { + return std::make_shared<TSocket>(clientSocket, config); + } + +private: + std::shared_ptr<TConfiguration> config; +}; + // Helper class that starts a server in a separate thread, and handles // the inter-thread communication to monitor whether it started // correctly. @@ -68,26 +92,26 @@ public: : _thrift_server(thrift_server), _signal_fired(false) {} // friendly to code style - virtual ~ThriftServerEventProcessor() {} + ~ThriftServerEventProcessor() override = default; // Called by TNonBlockingServer when server has acquired its resources and is ready to // serve, and signals to StartAndWaitForServer that start-up is finished. // From TServerEventHandler. - virtual void preServe(); + void preServe() override; // Called when a client connects; we create per-client state and call any // SessionHandlerIf handler. - virtual void* createContext(std::shared_ptr<apache::thrift::protocol::TProtocol> input, - std::shared_ptr<apache::thrift::protocol::TProtocol> output); + void* createContext(std::shared_ptr<apache::thrift::protocol::TProtocol> input, + std::shared_ptr<apache::thrift::protocol::TProtocol> output) override; // Called when a client starts an RPC; we set the thread-local session key. - virtual void processContext(void* context, - std::shared_ptr<apache::thrift::transport::TTransport> output); + void processContext(void* context, + std::shared_ptr<apache::thrift::transport::TTransport> output) override; // Called when a client disconnects; we call any SessionHandlerIf handler. - virtual void deleteContext(void* serverContext, - std::shared_ptr<apache::thrift::protocol::TProtocol> input, - std::shared_ptr<apache::thrift::protocol::TProtocol> output); + void deleteContext(void* serverContext, + std::shared_ptr<apache::thrift::protocol::TProtocol> input, + std::shared_ptr<apache::thrift::protocol::TProtocol> output) override; // Waits for a timeout of TIMEOUT_MS for a server to signal that it has started // correctly. @@ -125,8 +149,8 @@ Status ThriftServer::ThriftServerEventProcessor::start_and_wait_for_server() { std::unique_lock<std::mutex> lock(_signal_lock); _thrift_server->_started = false; - _thrift_server->_server_thread.reset( - new std::thread(&ThriftServer::ThriftServerEventProcessor::supervise, this)); + _thrift_server->_server_thread = std::make_unique<std::thread>( + &ThriftServer::ThriftServerEventProcessor::supervise, this); // Loop protects against spurious wakeup. Locks provide necessary fences to ensure // visibility. @@ -314,7 +338,7 @@ Status ThriftServer::start() { std::shared_ptr<apache::thrift::transport::TServerTransport> fe_server_transport; std::shared_ptr<apache::thrift::transport::TTransportFactory> transport_factory; std::shared_ptr<apache::thrift::transport::TNonblockingServerSocket> socket = - std::make_shared<apache::thrift::transport::TNonblockingServerSocket>(_port); + std::make_shared<ImprovedNonblockingServerSocket>(_port); if (_server_type != THREADED) { thread_mgr = apache::thrift::concurrency::ThreadManager::newSimpleThreadManager( _num_worker_threads); @@ -328,40 +352,39 @@ Status ThriftServer::start() { switch (_server_type) { case NON_BLOCKING: - if (transport_factory.get() == nullptr) { + if (transport_factory == nullptr) { transport_factory.reset(new apache::thrift::transport::TTransportFactory()); } - _server.reset(new apache::thrift::server::TNonblockingServer( + _server = std::make_unique<apache::thrift::server::TNonblockingServer>( _processor, transport_factory, transport_factory, protocol_factory, - protocol_factory, socket, thread_mgr)); + protocol_factory, socket, thread_mgr); break; case THREAD_POOL: fe_server_transport.reset(new apache::thrift::transport::TServerSocket( BackendOptions::get_service_bind_address_without_bracket(), _port)); - if (transport_factory.get() == nullptr) { + if (transport_factory == nullptr) { transport_factory.reset(new apache::thrift::transport::TBufferedTransportFactory()); } - _server.reset(new apache::thrift::server::TThreadPoolServer( - _processor, fe_server_transport, transport_factory, protocol_factory, thread_mgr)); + _server = std::make_unique<apache::thrift::server::TThreadPoolServer>( + _processor, fe_server_transport, transport_factory, protocol_factory, thread_mgr); break; case THREADED: server_socket = new apache::thrift::transport::TServerSocket( BackendOptions::get_service_bind_address_without_bracket(), _port); - // server_socket->setAcceptTimeout(500); fe_server_transport.reset(server_socket); - if (transport_factory.get() == nullptr) { + if (transport_factory == nullptr) { transport_factory.reset(new apache::thrift::transport::TBufferedTransportFactory()); } - _server.reset(new apache::thrift::server::TThreadedServer( + _server = std::make_unique<apache::thrift::server::TThreadedServer>( _processor, fe_server_transport, transport_factory, protocol_factory, - thread_factory)); + thread_factory); break; default: --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org