This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new e2ed3a4df0e branch-3.0: [bugfix](be) split the usage of thread pool in
internal service #49292 (#49324)
e2ed3a4df0e is described below
commit e2ed3a4df0e619666cb9bef1d0866abbec98bedc
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Sat May 17 11:10:15 2025 +0800
branch-3.0: [bugfix](be) split the usage of thread pool in internal service
#49292 (#49324)
Cherry-picked from #49292
Co-authored-by: yiguolei <[email protected]>
---
be/src/runtime/load_stream_mgr.h | 3 ---
be/src/service/internal_service.cpp | 48 +++++++++++++++++++-----------------
be/test/runtime/load_stream_test.cpp | 1 -
3 files changed, 25 insertions(+), 27 deletions(-)
diff --git a/be/src/runtime/load_stream_mgr.h b/be/src/runtime/load_stream_mgr.h
index dbbdbaf0070..daeb9add0ea 100644
--- a/be/src/runtime/load_stream_mgr.h
+++ b/be/src/runtime/load_stream_mgr.h
@@ -57,10 +57,8 @@ public:
size_t get_load_stream_num() { return _load_streams_map.size(); }
FifoThreadPool* heavy_work_pool() { return _heavy_work_pool; }
- FifoThreadPool* light_work_pool() { return _light_work_pool; }
void set_heavy_work_pool(FifoThreadPool* pool) { _heavy_work_pool = pool; }
- void set_light_work_pool(FifoThreadPool* pool) { _light_work_pool = pool; }
private:
std::mutex _lock;
@@ -68,7 +66,6 @@ private:
std::unique_ptr<ThreadPool> _file_writer_thread_pool;
FifoThreadPool* _heavy_work_pool = nullptr;
- FifoThreadPool* _light_work_pool = nullptr;
};
} // namespace doris
diff --git a/be/src/service/internal_service.cpp
b/be/src/service/internal_service.cpp
index 6b6a90eafe8..423c62c6c4b 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -192,6 +192,7 @@ private:
PInternalService::PInternalService(ExecEnv* exec_env)
: _exec_env(exec_env),
+ // heavy threadpool is used for load process and other process that
will read disk or access network.
_heavy_work_pool(config::brpc_heavy_work_pool_threads != -1
? config::brpc_heavy_work_pool_threads
: std::max(128, CpuInfo::num_cores() * 4),
@@ -199,6 +200,8 @@ PInternalService::PInternalService(ExecEnv* exec_env)
?
config::brpc_heavy_work_pool_max_queue_size
: std::max(10240, CpuInfo::num_cores() *
320),
"brpc_heavy"),
+
+ // light threadpool should be only used in query processing logic.
All hanlers should be very light, not locked, not access disk.
_light_work_pool(config::brpc_light_work_pool_threads != -1
? config::brpc_light_work_pool_threads
: std::max(128, CpuInfo::num_cores() * 4),
@@ -241,7 +244,6 @@ PInternalService::PInternalService(ExecEnv* exec_env)
[]() { return
config::brpc_arrow_flight_work_pool_threads; });
_exec_env->load_stream_mgr()->set_heavy_work_pool(&_heavy_work_pool);
- _exec_env->load_stream_mgr()->set_light_work_pool(&_light_work_pool);
CHECK_EQ(0, bthread_key_create(&btls_key, thread_context_deleter));
CHECK_EQ(0, bthread_key_create(&AsyncIO::btls_io_ctx_key,
AsyncIO::io_ctx_key_deleter));
@@ -291,7 +293,7 @@ void
PInternalService::tablet_writer_open(google::protobuf::RpcController* contr
const PTabletWriterOpenRequest*
request,
PTabletWriterOpenResult* response,
google::protobuf::Closure* done) {
- bool ret = _light_work_pool.try_offer([this, request, response, done]() {
+ bool ret = _heavy_work_pool.try_offer([this, request, response, done]() {
VLOG_RPC << "tablet writer open, id=" << request->id()
<< ", index_id=" << request->index_id() << ", txn_id=" <<
request->txn_id();
signal::set_signal_task_id(request->id());
@@ -305,7 +307,7 @@ void
PInternalService::tablet_writer_open(google::protobuf::RpcController* contr
st.to_protobuf(response->mutable_status());
});
if (!ret) {
- offer_failed(response, done, _light_work_pool);
+ offer_failed(response, done, _heavy_work_pool);
return;
}
}
@@ -399,7 +401,7 @@ void
PInternalService::open_load_stream(google::protobuf::RpcController* control
const POpenLoadStreamRequest* request,
POpenLoadStreamResponse* response,
google::protobuf::Closure* done) {
- bool ret = _light_work_pool.try_offer([this, controller, request,
response, done]() {
+ bool ret = _heavy_work_pool.try_offer([this, controller, request,
response, done]() {
signal::set_signal_task_id(request->load_id());
brpc::ClosureGuard done_guard(done);
brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);
@@ -448,7 +450,7 @@ void
PInternalService::open_load_stream(google::protobuf::RpcController* control
st.to_protobuf(response->mutable_status());
});
if (!ret) {
- offer_failed(response, done, _light_work_pool);
+ offer_failed(response, done, _heavy_work_pool);
}
}
@@ -503,7 +505,7 @@ void
PInternalService::tablet_writer_cancel(google::protobuf::RpcController* con
const PTabletWriterCancelRequest*
request,
PTabletWriterCancelResult*
response,
google::protobuf::Closure* done) {
- bool ret = _light_work_pool.try_offer([this, request, done]() {
+ bool ret = _heavy_work_pool.try_offer([this, request, done]() {
VLOG_RPC << "tablet writer cancel, id=" << request->id()
<< ", index_id=" << request->index_id() << ", sender_id=" <<
request->sender_id();
signal::set_signal_task_id(request->id());
@@ -516,7 +518,7 @@ void
PInternalService::tablet_writer_cancel(google::protobuf::RpcController* con
}
});
if (!ret) {
- offer_failed(response, done, _light_work_pool);
+ offer_failed(response, done, _heavy_work_pool);
return;
}
}
@@ -893,7 +895,7 @@ void
PInternalService::fetch_arrow_flight_schema(google::protobuf::RpcController
const
PFetchArrowFlightSchemaRequest* request,
PFetchArrowFlightSchemaResult* result,
google::protobuf::Closure*
done) {
- bool ret = _light_work_pool.try_offer([request, result, done]() {
+ bool ret = _arrow_flight_work_pool.try_offer([request, result, done]() {
brpc::ClosureGuard closure_guard(done);
std::shared_ptr<arrow::Schema> schema;
auto st = ExecEnv::GetInstance()->result_mgr()->find_arrow_schema(
@@ -918,7 +920,7 @@ void
PInternalService::fetch_arrow_flight_schema(google::protobuf::RpcController
st.to_protobuf(result->mutable_status());
});
if (!ret) {
- offer_failed(result, done, _heavy_work_pool);
+ offer_failed(result, done, _arrow_flight_work_pool);
return;
}
}
@@ -1337,12 +1339,12 @@ void
PInternalService::update_cache(google::protobuf::RpcController* controller,
void PInternalService::fetch_cache(google::protobuf::RpcController* controller,
const PFetchCacheRequest* request,
PFetchCacheResult* result,
google::protobuf::Closure* done) {
- bool ret = _heavy_work_pool.try_offer([this, request, result, done]() {
+ bool ret = _light_work_pool.try_offer([this, request, result, done]() {
brpc::ClosureGuard closure_guard(done);
_exec_env->result_cache()->fetch(request, result);
});
if (!ret) {
- offer_failed(result, done, _heavy_work_pool);
+ offer_failed(result, done, _light_work_pool);
return;
}
}
@@ -1466,7 +1468,7 @@ void
PInternalService::send_data(google::protobuf::RpcController* controller,
void PInternalService::commit(google::protobuf::RpcController* controller,
const PCommitRequest* request, PCommitResult*
response,
google::protobuf::Closure* done) {
- bool ret = _light_work_pool.try_offer([this, request, response, done]() {
+ bool ret = _heavy_work_pool.try_offer([this, request, response, done]() {
brpc::ClosureGuard closure_guard(done);
TUniqueId load_id;
load_id.hi = request->load_id().hi();
@@ -1482,7 +1484,7 @@ void
PInternalService::commit(google::protobuf::RpcController* controller,
}
});
if (!ret) {
- offer_failed(response, done, _light_work_pool);
+ offer_failed(response, done, _heavy_work_pool);
return;
}
}
@@ -1490,7 +1492,7 @@ void
PInternalService::commit(google::protobuf::RpcController* controller,
void PInternalService::rollback(google::protobuf::RpcController* controller,
const PRollbackRequest* request,
PRollbackResult* response,
google::protobuf::Closure* done) {
- bool ret = _light_work_pool.try_offer([this, request, response, done]() {
+ bool ret = _heavy_work_pool.try_offer([this, request, response, done]() {
brpc::ClosureGuard closure_guard(done);
TUniqueId load_id;
load_id.hi = request->load_id().hi();
@@ -1505,7 +1507,7 @@ void
PInternalService::rollback(google::protobuf::RpcController* controller,
}
});
if (!ret) {
- offer_failed(response, done, _light_work_pool);
+ offer_failed(response, done, _heavy_work_pool);
return;
}
}
@@ -2008,7 +2010,7 @@ void
PInternalServiceImpl::_response_pull_slave_rowset(const std::string& remote
void PInternalServiceImpl::response_slave_tablet_pull_rowset(
google::protobuf::RpcController* controller, const
PTabletWriteSlaveDoneRequest* request,
PTabletWriteSlaveDoneResult* response, google::protobuf::Closure*
done) {
- bool ret = _light_work_pool.try_offer([txn_mgr = _engine.txn_manager(),
request, response,
+ bool ret = _heavy_work_pool.try_offer([txn_mgr = _engine.txn_manager(),
request, response,
done]() {
brpc::ClosureGuard closure_guard(done);
VLOG_CRITICAL << "receive the result of slave replica pull rowset from
slave replica. "
@@ -2028,7 +2030,7 @@ void
PInternalServiceImpl::response_slave_tablet_pull_rowset(
void PInternalService::multiget_data(google::protobuf::RpcController*
controller,
const PMultiGetRequest* request,
PMultiGetResponse* response,
google::protobuf::Closure* done) {
- bool ret = _light_work_pool.try_offer([request, response, done]() {
+ bool ret = _heavy_work_pool.try_offer([request, response, done]() {
signal::set_signal_task_id(request->query_id());
// multi get data by rowid
MonotonicStopWatch watch;
@@ -2086,7 +2088,7 @@ void
PInternalService::group_commit_insert(google::protobuf::RpcController* cont
load_id.__set_lo(request->load_id().lo());
std::shared_ptr<std::mutex> lock = std::make_shared<std::mutex>();
std::shared_ptr<bool> is_done = std::make_shared<bool>(false);
- bool ret = _light_work_pool.try_offer([this, request, response, done,
load_id, lock,
+ bool ret = _heavy_work_pool.try_offer([this, request, response, done,
load_id, lock,
is_done]() {
brpc::ClosureGuard closure_guard(done);
std::shared_ptr<StreamLoadContext> ctx =
std::make_shared<StreamLoadContext>(_exec_env);
@@ -2155,7 +2157,7 @@ void
PInternalService::group_commit_insert(google::protobuf::RpcController* cont
});
if (!ret) {
_exec_env->new_load_stream_mgr()->remove(load_id);
- offer_failed(response, done, _light_work_pool);
+ offer_failed(response, done, _heavy_work_pool);
return;
}
};
@@ -2164,7 +2166,7 @@ void
PInternalService::get_wal_queue_size(google::protobuf::RpcController* contr
const PGetWalQueueSizeRequest*
request,
PGetWalQueueSizeResponse* response,
google::protobuf::Closure* done) {
- bool ret = _light_work_pool.try_offer([this, request, response, done]() {
+ bool ret = _heavy_work_pool.try_offer([this, request, response, done]() {
brpc::ClosureGuard closure_guard(done);
Status st = Status::OK();
auto table_id = request->table_id();
@@ -2173,7 +2175,7 @@ void
PInternalService::get_wal_queue_size(google::protobuf::RpcController* contr
response->mutable_status()->set_status_code(st.code());
});
if (!ret) {
- offer_failed(response, done, _light_work_pool);
+ offer_failed(response, done, _heavy_work_pool);
}
}
@@ -2181,7 +2183,7 @@ void
PInternalService::get_be_resource(google::protobuf::RpcController* controll
const PGetBeResourceRequest* request,
PGetBeResourceResponse* response,
google::protobuf::Closure* done) {
- bool ret = _light_work_pool.try_offer([response, done]() {
+ bool ret = _heavy_work_pool.try_offer([response, done]() {
brpc::ClosureGuard closure_guard(done);
int64_t mem_limit = MemInfo::mem_limit();
int64_t mem_usage = PerfCounters::get_vm_rss();
@@ -2194,7 +2196,7 @@ void
PInternalService::get_be_resource(google::protobuf::RpcController* controll
response->mutable_status()->set_status_code(st.code());
});
if (!ret) {
- offer_failed(response, done, _light_work_pool);
+ offer_failed(response, done, _heavy_work_pool);
}
}
diff --git a/be/test/runtime/load_stream_test.cpp
b/be/test/runtime/load_stream_test.cpp
index b5066ccf169..054f2663acf 100644
--- a/be/test/runtime/load_stream_test.cpp
+++ b/be/test/runtime/load_stream_test.cpp
@@ -603,7 +603,6 @@ public:
_load_stream_mgr = std::make_unique<LoadStreamMgr>(4);
_load_stream_mgr->set_heavy_work_pool(&_heavy_work_pool);
- _load_stream_mgr->set_light_work_pool(&_light_work_pool);
_stream_service = new StreamService(_load_stream_mgr.get());
CHECK_EQ(0, _server->AddService(_stream_service,
brpc::SERVER_OWNS_SERVICE));
brpc::ServerOptions server_options;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]