This is an automated email from the ASF dual-hosted git repository.
panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 1df73575cf5 [Bug](load) fix use after free on load channel in work
load group scheduler (#36272)
1df73575cf5 is described below
commit 1df73575cf5f589d0b95eb61434e17d8444ad825
Author: Pxl <[email protected]>
AuthorDate: Thu Jun 13 23:00:27 2024 +0800
[Bug](load) fix use after free on load channel in work load group scheduler
(#36272)
## Proposed changes
fix use after free on load channel in work load group scheduler
introduced by #36176
---
be/src/runtime/fragment_mgr.cpp | 37 ++++++++++++++++---------------------
be/src/runtime/fragment_mgr.h | 6 +++---
be/src/runtime/load_channel.cpp | 3 ++-
be/src/runtime/load_stream.cpp | 2 +-
4 files changed, 22 insertions(+), 26 deletions(-)
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index e8b25a96717..9451135da6e 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -529,7 +529,7 @@ Status FragmentMgr::start_query_execution(const
PExecPlanFragmentStartRequest* r
TUniqueId query_id;
query_id.__set_hi(request->query_id().hi());
query_id.__set_lo(request->query_id().lo());
- if (auto q_ctx = get_or_erase_query_ctx(query_id)) {
+ if (auto q_ctx = _get_or_erase_query_ctx(query_id)) {
q_ctx->set_ready_to_execute(Status::OK());
} else {
return Status::InternalError(
@@ -560,7 +560,7 @@ void FragmentMgr::remove_pipeline_context(
}
}
-std::shared_ptr<QueryContext> FragmentMgr::get_or_erase_query_ctx(TUniqueId
query_id) {
+std::shared_ptr<QueryContext> FragmentMgr::_get_or_erase_query_ctx(const
TUniqueId& query_id) {
auto search = _query_ctx_map.find(query_id);
if (search != _query_ctx_map.end()) {
if (auto q_ctx = search->second.lock()) {
@@ -575,13 +575,19 @@ std::shared_ptr<QueryContext>
FragmentMgr::get_or_erase_query_ctx(TUniqueId quer
return nullptr;
}
+std::shared_ptr<QueryContext> FragmentMgr::get_or_erase_query_ctx_with_lock(
+ const TUniqueId& query_id) {
+ std::unique_lock<std::mutex> lock(_lock);
+ return _get_or_erase_query_ctx(query_id);
+}
+
template <typename Params>
Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id,
bool pipeline,
std::shared_ptr<QueryContext>& query_ctx) {
if (params.is_simplified_param) {
// Get common components from _query_ctx_map
std::lock_guard<std::mutex> lock(_lock);
- if (auto q_ctx = get_or_erase_query_ctx(query_id)) {
+ if (auto q_ctx = _get_or_erase_query_ctx(query_id)) {
query_ctx = q_ctx;
} else {
return Status::InternalError(
@@ -593,7 +599,7 @@ Status FragmentMgr::_get_query_ctx(const Params& params,
TUniqueId query_id, boo
// Find _query_ctx_map, in case some other request has already
// create the query fragments context.
std::lock_guard<std::mutex> lock(_lock);
- if (auto q_ctx = get_or_erase_query_ctx(query_id)) {
+ if (auto q_ctx = _get_or_erase_query_ctx(query_id)) {
query_ctx = q_ctx;
return Status::OK();
}
@@ -691,7 +697,7 @@ std::string FragmentMgr::dump_pipeline_tasks(int64_t
duration) {
}
std::string FragmentMgr::dump_pipeline_tasks(TUniqueId& query_id) {
- if (auto q_ctx = get_or_erase_query_ctx(query_id)) {
+ if (auto q_ctx = _get_or_erase_query_ctx(query_id)) {
return q_ctx->print_all_pipeline_context();
} else {
return fmt::format("Query context (query id = {}) not found. \n",
print_id(query_id));
@@ -787,23 +793,12 @@ void FragmentMgr::_set_scan_concurrency(const Param&
params, QueryContext* query
#endif
}
-Status FragmentMgr::get_query_context(const TUniqueId& query_id,
- std::shared_ptr<QueryContext>*
query_ctx) {
- std::lock_guard<std::mutex> state_lock(_lock);
- if (auto q_ctx = get_or_erase_query_ctx(query_id)) {
- *query_ctx = q_ctx;
- } else {
- return Status::InternalError("Query context not found for query {}",
print_id(query_id));
- }
- return Status::OK();
-}
-
void FragmentMgr::cancel_query(const TUniqueId query_id, const Status reason) {
std::shared_ptr<QueryContext> query_ctx = nullptr;
std::vector<TUniqueId> all_instance_ids;
{
std::lock_guard<std::mutex> state_lock(_lock);
- if (auto q_ctx = get_or_erase_query_ctx(query_id)) {
+ if (auto q_ctx = _get_or_erase_query_ctx(query_id)) {
query_ctx = q_ctx;
// Copy instanceids to avoid concurrent modification.
// And to reduce the scope of lock.
@@ -1137,7 +1132,7 @@ Status FragmentMgr::send_filter_size(const
PSendFilterSizeRequest* request) {
query_id.__set_hi(queryid.hi);
query_id.__set_lo(queryid.lo);
std::lock_guard<std::mutex> lock(_lock);
- if (auto q_ctx = get_or_erase_query_ctx(query_id)) {
+ if (auto q_ctx = _get_or_erase_query_ctx(query_id)) {
query_ctx = q_ctx;
} else {
return Status::InvalidArgument("Query context (query-id: {}) not
found",
@@ -1156,7 +1151,7 @@ Status FragmentMgr::sync_filter_size(const
PSyncFilterSizeRequest* request) {
query_id.__set_hi(queryid.hi);
query_id.__set_lo(queryid.lo);
std::lock_guard<std::mutex> lock(_lock);
- if (auto q_ctx = get_or_erase_query_ctx(query_id)) {
+ if (auto q_ctx = _get_or_erase_query_ctx(query_id)) {
query_ctx = q_ctx;
} else {
return Status::InvalidArgument("Query context (query-id: {}) not
found",
@@ -1178,7 +1173,7 @@ Status FragmentMgr::merge_filter(const
PMergeFilterRequest* request,
query_id.__set_hi(queryid.hi);
query_id.__set_lo(queryid.lo);
std::lock_guard<std::mutex> lock(_lock);
- if (auto q_ctx = get_or_erase_query_ctx(query_id)) {
+ if (auto q_ctx = _get_or_erase_query_ctx(query_id)) {
query_ctx = q_ctx;
} else {
return Status::InvalidArgument("Query context (query-id: {}) not
found",
@@ -1219,7 +1214,7 @@ Status FragmentMgr::get_realtime_exec_status(const
TUniqueId& query_id,
{
std::lock_guard<std::mutex> lock(_lock);
- if (auto q_ctx = get_or_erase_query_ctx(query_id)) {
+ if (auto q_ctx = _get_or_erase_query_ctx(query_id)) {
query_context = q_ctx;
} else {
return Status::NotFound("Query {} has been released",
print_id(query_id));
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index 5355f51a217..dba9bcde398 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -135,8 +135,6 @@ public:
ThreadPool* get_thread_pool() { return _thread_pool.get(); }
- Status get_query_context(const TUniqueId& query_id,
std::shared_ptr<QueryContext>* query_ctx);
-
int32_t running_query_num() {
std::unique_lock<std::mutex> ctx_lock(_lock);
return _query_ctx_map.size();
@@ -150,9 +148,11 @@ public:
Status get_realtime_exec_status(const TUniqueId& query_id,
TReportExecStatusParams* exec_status);
- std::shared_ptr<QueryContext> get_or_erase_query_ctx(TUniqueId query_id);
+ std::shared_ptr<QueryContext> get_or_erase_query_ctx_with_lock(const
TUniqueId& query_id);
private:
+ std::shared_ptr<QueryContext> _get_or_erase_query_ctx(const TUniqueId&
query_id);
+
template <typename Param>
void _set_scan_concurrency(const Param& params, QueryContext* query_ctx);
diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp
index f307a6e3545..cd3f3aa5af5 100644
--- a/be/src/runtime/load_channel.cpp
+++ b/be/src/runtime/load_channel.cpp
@@ -43,7 +43,8 @@ LoadChannel::LoadChannel(const UniqueId& load_id, int64_t
timeout_s, bool is_hig
_backend_id(backend_id),
_enable_profile(enable_profile) {
std::shared_ptr<QueryContext> query_context =
-
ExecEnv::GetInstance()->fragment_mgr()->get_or_erase_query_ctx(_load_id.to_thrift());
+
ExecEnv::GetInstance()->fragment_mgr()->get_or_erase_query_ctx_with_lock(
+ _load_id.to_thrift());
if (query_context != nullptr) {
_query_thread_context = {_load_id.to_thrift(),
query_context->query_mem_tracker};
} else {
diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp
index d4132cada9f..b896994b1ef 100644
--- a/be/src/runtime/load_stream.cpp
+++ b/be/src/runtime/load_stream.cpp
@@ -336,7 +336,7 @@ LoadStream::LoadStream(PUniqueId load_id, LoadStreamMgr*
load_stream_mgr, bool e
TUniqueId load_tid = ((UniqueId)load_id).to_thrift();
#ifndef BE_TEST
std::shared_ptr<QueryContext> query_context =
-
ExecEnv::GetInstance()->fragment_mgr()->get_or_erase_query_ctx(load_tid);
+
ExecEnv::GetInstance()->fragment_mgr()->get_or_erase_query_ctx_with_lock(load_tid);
if (query_context != nullptr) {
_query_thread_context = {load_tid, query_context->query_mem_tracker};
} else {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]