github-actions[bot] commented on code in PR #60812:
URL: https://github.com/apache/doris/pull/60812#discussion_r2884487679
##########
be/src/runtime/fragment_mgr.cpp:
##########
@@ -1370,6 +1405,10 @@ Status FragmentMgr::sync_filter_size(const
PSyncFilterSizeRequest* request) {
query_id.__set_hi(queryid.hi);
query_id.__set_lo(queryid.lo);
if (auto q_ctx = get_query_ctx(query_id)) {
+ // just discard low stage request
Review Comment:
**[Minor] Unused variable**: `rf_ids` is assigned but never used. The next
line passes `it->second.deregister_runtime_filter_ids` directly. This looks
like a leftover from an earlier iteration. Should be removed to avoid compiler
warnings (`-Wunused-variable`).
##########
be/src/runtime/query_context.h:
##########
@@ -410,7 +413,30 @@ class QueryContext : public
std::enable_shared_from_this<QueryContext> {
timespec get_query_arrival_timestamp() const { return
this->_query_arrival_timestamp; }
QuerySource get_query_source() const { return this->_query_source; }
- const TQueryOptions get_query_options() const { return _query_options; }
+ TQueryOptions get_query_options() const { return _query_options; }
Review Comment:
**[Minor] Unnecessary copy**: `filter_ids` is taken by value
(`std::set<int32_t> filter_ids`) but is only iterated, never modified. Consider
taking it by `const std::set<int32_t>&` to avoid the copy.
##########
be/src/runtime/fragment_mgr.cpp:
##########
@@ -1370,6 +1405,10 @@ Status FragmentMgr::sync_filter_size(const
PSyncFilterSizeRequest* request) {
query_id.__set_hi(queryid.hi);
query_id.__set_lo(queryid.lo);
if (auto q_ctx = get_query_ctx(query_id)) {
+ // just discard low stage request
+ if (q_ctx->get_stage(request->filter_id()) != request->stage()) {
+ return Status::OK();
+ }
Review Comment:
**[High] Dangling Pointer Risk**: `info` is a raw pointer into
`_rerunnable_params_map`, but the `_rerunnable_params_lock` has been released
at this point (line 1566). If a concurrent `remove_query_context()` or `stop()`
erases this entry from the map, `info` becomes a dangling pointer, and
subsequent accesses (`info->params`, `info->finish_callback`, etc.) are
use-after-free.
This can happen if the query is cancelled while `recreate_and_submit` is in
progress. The `q_ctx` shared_ptr keeps the QueryContext alive, but does NOT
prevent the `RerunableFragmentInfo` struct from being erased from
`_rerunnable_params_map`.
Fix: Copy out the needed fields (`params`, `finish_callback`) while still
holding the lock, rather than escaping a raw pointer.
##########
be/src/pipeline/pipeline_fragment_context.cpp:
##########
@@ -179,9 +179,16 @@ void PipelineFragmentContext::cancel(const Status reason) {
{
std::lock_guard<std::mutex> l(_task_mutex);
if (_closed_tasks >= _total_tasks) {
+ if (_need_notify_close) {
+ // if fragment cancelled and waiting for notify to close, need
to remove from fragment mgr
+ _exec_env->fragment_mgr()->remove_pipeline_context({_query_id,
_fragment_id});
+ _need_notify_close = false;
+ }
// All tasks in this PipelineXFragmentContext already closed.
return;
}
+ // make fragment release by self after cancel
Review Comment:
**[Nit] Extra indentation space**: This line has 9 leading spaces (`+
// make...`) but the surrounding code uses 8 spaces (two levels of 4). The
next line `_need_notify_close = false;` also has extra indentation. Please fix
to match the surrounding style.
##########
be/src/runtime/query_context.h:
##########
@@ -394,6 +394,9 @@ class QueryContext : public
std::enable_shared_from_this<QueryContext> {
std::map<std::pair<TUniqueId, int>, pipeline::RecCTEScanLocalState*>
_cte_scan;
std::mutex _cte_scan_lock;
+ // for represent the rf's fragment execution round number of recursive cte
+ std::unordered_map<int, uint32_t> _filter_id_to_stage; // filter id ->
stage number
Review Comment:
**[Critical] Data Race on `_filter_id_to_stage`**: This `unordered_map` is
read by `get_stage()` from multiple threads (brpc workers in `apply_filterv2`,
`send_filter_size`, `sync_filter_size`, `merge_filter`, and pipeline execution
threads in `RuntimeFilterProducer::send_size`, `_send_rf_to_target`) and
written by `update_filters_stage()` from the rerun handler thread. There is
**no synchronization** protecting this map.
`std::unordered_map::operator[]` in the writer can trigger a rehash,
invalidating iterators used by concurrent `find()` calls in readers. This is
undefined behavior and can cause crashes or corrupted reads.
Compare with the nearby `_cte_scan` map (line 394) which has its own
`_cte_scan_lock`. A similar `std::mutex` or `std::shared_mutex` is needed here,
or use a lock-free approach (e.g., `std::atomic<uint32_t>` array if filter IDs
are bounded).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]