This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new e2ed3a4df0e branch-3.0: [bugfix](be) split the usage of thread pool in 
internal service #49292 (#49324)
e2ed3a4df0e is described below

commit e2ed3a4df0e619666cb9bef1d0866abbec98bedc
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Sat May 17 11:10:15 2025 +0800

    branch-3.0: [bugfix](be) split the usage of thread pool in internal service 
#49292 (#49324)
    
    Cherry-picked from #49292
    
    Co-authored-by: yiguolei <[email protected]>
---
 be/src/runtime/load_stream_mgr.h     |  3 ---
 be/src/service/internal_service.cpp  | 48 +++++++++++++++++++-----------------
 be/test/runtime/load_stream_test.cpp |  1 -
 3 files changed, 25 insertions(+), 27 deletions(-)

diff --git a/be/src/runtime/load_stream_mgr.h b/be/src/runtime/load_stream_mgr.h
index dbbdbaf0070..daeb9add0ea 100644
--- a/be/src/runtime/load_stream_mgr.h
+++ b/be/src/runtime/load_stream_mgr.h
@@ -57,10 +57,8 @@ public:
     size_t get_load_stream_num() { return _load_streams_map.size(); }
 
     FifoThreadPool* heavy_work_pool() { return _heavy_work_pool; }
-    FifoThreadPool* light_work_pool() { return _light_work_pool; }
 
     void set_heavy_work_pool(FifoThreadPool* pool) { _heavy_work_pool = pool; }
-    void set_light_work_pool(FifoThreadPool* pool) { _light_work_pool = pool; }
 
 private:
     std::mutex _lock;
@@ -68,7 +66,6 @@ private:
     std::unique_ptr<ThreadPool> _file_writer_thread_pool;
 
     FifoThreadPool* _heavy_work_pool = nullptr;
-    FifoThreadPool* _light_work_pool = nullptr;
 };
 
 } // namespace doris
diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index 6b6a90eafe8..423c62c6c4b 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -192,6 +192,7 @@ private:
 
 PInternalService::PInternalService(ExecEnv* exec_env)
         : _exec_env(exec_env),
+          // heavy threadpool is used for load process and other process that 
will read disk or access network.
           _heavy_work_pool(config::brpc_heavy_work_pool_threads != -1
                                    ? config::brpc_heavy_work_pool_threads
                                    : std::max(128, CpuInfo::num_cores() * 4),
@@ -199,6 +200,8 @@ PInternalService::PInternalService(ExecEnv* exec_env)
                                    ? 
config::brpc_heavy_work_pool_max_queue_size
                                    : std::max(10240, CpuInfo::num_cores() * 
320),
                            "brpc_heavy"),
+
+          // light threadpool should be only used in query processing logic. 
All hanlers should be very light, not locked, not access disk.
           _light_work_pool(config::brpc_light_work_pool_threads != -1
                                    ? config::brpc_light_work_pool_threads
                                    : std::max(128, CpuInfo::num_cores() * 4),
@@ -241,7 +244,6 @@ PInternalService::PInternalService(ExecEnv* exec_env)
                          []() { return 
config::brpc_arrow_flight_work_pool_threads; });
 
     _exec_env->load_stream_mgr()->set_heavy_work_pool(&_heavy_work_pool);
-    _exec_env->load_stream_mgr()->set_light_work_pool(&_light_work_pool);
 
     CHECK_EQ(0, bthread_key_create(&btls_key, thread_context_deleter));
     CHECK_EQ(0, bthread_key_create(&AsyncIO::btls_io_ctx_key, 
AsyncIO::io_ctx_key_deleter));
@@ -291,7 +293,7 @@ void 
PInternalService::tablet_writer_open(google::protobuf::RpcController* contr
                                           const PTabletWriterOpenRequest* 
request,
                                           PTabletWriterOpenResult* response,
                                           google::protobuf::Closure* done) {
-    bool ret = _light_work_pool.try_offer([this, request, response, done]() {
+    bool ret = _heavy_work_pool.try_offer([this, request, response, done]() {
         VLOG_RPC << "tablet writer open, id=" << request->id()
                  << ", index_id=" << request->index_id() << ", txn_id=" << 
request->txn_id();
         signal::set_signal_task_id(request->id());
@@ -305,7 +307,7 @@ void 
PInternalService::tablet_writer_open(google::protobuf::RpcController* contr
         st.to_protobuf(response->mutable_status());
     });
     if (!ret) {
-        offer_failed(response, done, _light_work_pool);
+        offer_failed(response, done, _heavy_work_pool);
         return;
     }
 }
@@ -399,7 +401,7 @@ void 
PInternalService::open_load_stream(google::protobuf::RpcController* control
                                         const POpenLoadStreamRequest* request,
                                         POpenLoadStreamResponse* response,
                                         google::protobuf::Closure* done) {
-    bool ret = _light_work_pool.try_offer([this, controller, request, 
response, done]() {
+    bool ret = _heavy_work_pool.try_offer([this, controller, request, 
response, done]() {
         signal::set_signal_task_id(request->load_id());
         brpc::ClosureGuard done_guard(done);
         brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);
@@ -448,7 +450,7 @@ void 
PInternalService::open_load_stream(google::protobuf::RpcController* control
         st.to_protobuf(response->mutable_status());
     });
     if (!ret) {
-        offer_failed(response, done, _light_work_pool);
+        offer_failed(response, done, _heavy_work_pool);
     }
 }
 
@@ -503,7 +505,7 @@ void 
PInternalService::tablet_writer_cancel(google::protobuf::RpcController* con
                                             const PTabletWriterCancelRequest* 
request,
                                             PTabletWriterCancelResult* 
response,
                                             google::protobuf::Closure* done) {
-    bool ret = _light_work_pool.try_offer([this, request, done]() {
+    bool ret = _heavy_work_pool.try_offer([this, request, done]() {
         VLOG_RPC << "tablet writer cancel, id=" << request->id()
                  << ", index_id=" << request->index_id() << ", sender_id=" << 
request->sender_id();
         signal::set_signal_task_id(request->id());
@@ -516,7 +518,7 @@ void 
PInternalService::tablet_writer_cancel(google::protobuf::RpcController* con
         }
     });
     if (!ret) {
-        offer_failed(response, done, _light_work_pool);
+        offer_failed(response, done, _heavy_work_pool);
         return;
     }
 }
@@ -893,7 +895,7 @@ void 
PInternalService::fetch_arrow_flight_schema(google::protobuf::RpcController
                                                  const 
PFetchArrowFlightSchemaRequest* request,
                                                  
PFetchArrowFlightSchemaResult* result,
                                                  google::protobuf::Closure* 
done) {
-    bool ret = _light_work_pool.try_offer([request, result, done]() {
+    bool ret = _arrow_flight_work_pool.try_offer([request, result, done]() {
         brpc::ClosureGuard closure_guard(done);
         std::shared_ptr<arrow::Schema> schema;
         auto st = ExecEnv::GetInstance()->result_mgr()->find_arrow_schema(
@@ -918,7 +920,7 @@ void 
PInternalService::fetch_arrow_flight_schema(google::protobuf::RpcController
         st.to_protobuf(result->mutable_status());
     });
     if (!ret) {
-        offer_failed(result, done, _heavy_work_pool);
+        offer_failed(result, done, _arrow_flight_work_pool);
         return;
     }
 }
@@ -1337,12 +1339,12 @@ void 
PInternalService::update_cache(google::protobuf::RpcController* controller,
 void PInternalService::fetch_cache(google::protobuf::RpcController* controller,
                                    const PFetchCacheRequest* request, 
PFetchCacheResult* result,
                                    google::protobuf::Closure* done) {
-    bool ret = _heavy_work_pool.try_offer([this, request, result, done]() {
+    bool ret = _light_work_pool.try_offer([this, request, result, done]() {
         brpc::ClosureGuard closure_guard(done);
         _exec_env->result_cache()->fetch(request, result);
     });
     if (!ret) {
-        offer_failed(result, done, _heavy_work_pool);
+        offer_failed(result, done, _light_work_pool);
         return;
     }
 }
@@ -1466,7 +1468,7 @@ void 
PInternalService::send_data(google::protobuf::RpcController* controller,
 void PInternalService::commit(google::protobuf::RpcController* controller,
                               const PCommitRequest* request, PCommitResult* 
response,
                               google::protobuf::Closure* done) {
-    bool ret = _light_work_pool.try_offer([this, request, response, done]() {
+    bool ret = _heavy_work_pool.try_offer([this, request, response, done]() {
         brpc::ClosureGuard closure_guard(done);
         TUniqueId load_id;
         load_id.hi = request->load_id().hi();
@@ -1482,7 +1484,7 @@ void 
PInternalService::commit(google::protobuf::RpcController* controller,
         }
     });
     if (!ret) {
-        offer_failed(response, done, _light_work_pool);
+        offer_failed(response, done, _heavy_work_pool);
         return;
     }
 }
@@ -1490,7 +1492,7 @@ void 
PInternalService::commit(google::protobuf::RpcController* controller,
 void PInternalService::rollback(google::protobuf::RpcController* controller,
                                 const PRollbackRequest* request, 
PRollbackResult* response,
                                 google::protobuf::Closure* done) {
-    bool ret = _light_work_pool.try_offer([this, request, response, done]() {
+    bool ret = _heavy_work_pool.try_offer([this, request, response, done]() {
         brpc::ClosureGuard closure_guard(done);
         TUniqueId load_id;
         load_id.hi = request->load_id().hi();
@@ -1505,7 +1507,7 @@ void 
PInternalService::rollback(google::protobuf::RpcController* controller,
         }
     });
     if (!ret) {
-        offer_failed(response, done, _light_work_pool);
+        offer_failed(response, done, _heavy_work_pool);
         return;
     }
 }
@@ -2008,7 +2010,7 @@ void 
PInternalServiceImpl::_response_pull_slave_rowset(const std::string& remote
 void PInternalServiceImpl::response_slave_tablet_pull_rowset(
         google::protobuf::RpcController* controller, const 
PTabletWriteSlaveDoneRequest* request,
         PTabletWriteSlaveDoneResult* response, google::protobuf::Closure* 
done) {
-    bool ret = _light_work_pool.try_offer([txn_mgr = _engine.txn_manager(), 
request, response,
+    bool ret = _heavy_work_pool.try_offer([txn_mgr = _engine.txn_manager(), 
request, response,
                                            done]() {
         brpc::ClosureGuard closure_guard(done);
         VLOG_CRITICAL << "receive the result of slave replica pull rowset from 
slave replica. "
@@ -2028,7 +2030,7 @@ void 
PInternalServiceImpl::response_slave_tablet_pull_rowset(
 void PInternalService::multiget_data(google::protobuf::RpcController* 
controller,
                                      const PMultiGetRequest* request, 
PMultiGetResponse* response,
                                      google::protobuf::Closure* done) {
-    bool ret = _light_work_pool.try_offer([request, response, done]() {
+    bool ret = _heavy_work_pool.try_offer([request, response, done]() {
         signal::set_signal_task_id(request->query_id());
         // multi get data by rowid
         MonotonicStopWatch watch;
@@ -2086,7 +2088,7 @@ void 
PInternalService::group_commit_insert(google::protobuf::RpcController* cont
     load_id.__set_lo(request->load_id().lo());
     std::shared_ptr<std::mutex> lock = std::make_shared<std::mutex>();
     std::shared_ptr<bool> is_done = std::make_shared<bool>(false);
-    bool ret = _light_work_pool.try_offer([this, request, response, done, 
load_id, lock,
+    bool ret = _heavy_work_pool.try_offer([this, request, response, done, 
load_id, lock,
                                            is_done]() {
         brpc::ClosureGuard closure_guard(done);
         std::shared_ptr<StreamLoadContext> ctx = 
std::make_shared<StreamLoadContext>(_exec_env);
@@ -2155,7 +2157,7 @@ void 
PInternalService::group_commit_insert(google::protobuf::RpcController* cont
     });
     if (!ret) {
         _exec_env->new_load_stream_mgr()->remove(load_id);
-        offer_failed(response, done, _light_work_pool);
+        offer_failed(response, done, _heavy_work_pool);
         return;
     }
 };
@@ -2164,7 +2166,7 @@ void 
PInternalService::get_wal_queue_size(google::protobuf::RpcController* contr
                                           const PGetWalQueueSizeRequest* 
request,
                                           PGetWalQueueSizeResponse* response,
                                           google::protobuf::Closure* done) {
-    bool ret = _light_work_pool.try_offer([this, request, response, done]() {
+    bool ret = _heavy_work_pool.try_offer([this, request, response, done]() {
         brpc::ClosureGuard closure_guard(done);
         Status st = Status::OK();
         auto table_id = request->table_id();
@@ -2173,7 +2175,7 @@ void 
PInternalService::get_wal_queue_size(google::protobuf::RpcController* contr
         response->mutable_status()->set_status_code(st.code());
     });
     if (!ret) {
-        offer_failed(response, done, _light_work_pool);
+        offer_failed(response, done, _heavy_work_pool);
     }
 }
 
@@ -2181,7 +2183,7 @@ void 
PInternalService::get_be_resource(google::protobuf::RpcController* controll
                                        const PGetBeResourceRequest* request,
                                        PGetBeResourceResponse* response,
                                        google::protobuf::Closure* done) {
-    bool ret = _light_work_pool.try_offer([response, done]() {
+    bool ret = _heavy_work_pool.try_offer([response, done]() {
         brpc::ClosureGuard closure_guard(done);
         int64_t mem_limit = MemInfo::mem_limit();
         int64_t mem_usage = PerfCounters::get_vm_rss();
@@ -2194,7 +2196,7 @@ void 
PInternalService::get_be_resource(google::protobuf::RpcController* controll
         response->mutable_status()->set_status_code(st.code());
     });
     if (!ret) {
-        offer_failed(response, done, _light_work_pool);
+        offer_failed(response, done, _heavy_work_pool);
     }
 }
 
diff --git a/be/test/runtime/load_stream_test.cpp 
b/be/test/runtime/load_stream_test.cpp
index b5066ccf169..054f2663acf 100644
--- a/be/test/runtime/load_stream_test.cpp
+++ b/be/test/runtime/load_stream_test.cpp
@@ -603,7 +603,6 @@ public:
 
         _load_stream_mgr = std::make_unique<LoadStreamMgr>(4);
         _load_stream_mgr->set_heavy_work_pool(&_heavy_work_pool);
-        _load_stream_mgr->set_light_work_pool(&_light_work_pool);
         _stream_service = new StreamService(_load_stream_mgr.get());
         CHECK_EQ(0, _server->AddService(_stream_service, 
brpc::SERVER_OWNS_SERVICE));
         brpc::ServerOptions server_options;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to