github-actions[bot] commented on code in PR #26078: URL: https://github.com/apache/doris/pull/26078#discussion_r1375933165
########## be/src/pipeline/pipeline_x/dependency.cpp: ########## @@ -326,4 +330,114 @@ Status HashJoinDependency::extract_join_column(vectorized::Block& block, return Status::OK(); } +bool RuntimeFilterTimer::has_ready() { + std::unique_lock<std::mutex> lc(_lock); + return _runtime_filter->is_ready(); +} + +void RuntimeFilterTimer::call_timeout() { + std::unique_lock<std::mutex> lc(_lock); + if (_call_ready) { + return; + } + _call_timeout = true; + if (_parent) { + _parent->sub_filters(); + } +} + +void RuntimeFilterTimer::call_ready() { + std::unique_lock<std::mutex> lc(_lock); + if (_call_timeout) { + return; + } + _call_ready = true; + if (_parent) { + _parent->sub_filters(); + } +} + +void RuntimeFilterTimer::call_has_ready() { + std::unique_lock<std::mutex> lc(_lock); + DCHECK(!_call_timeout); + if (!_call_ready) { + _parent->sub_filters(); + } +} + +void RuntimeFilterTimer::call_has_release() { + // When the use count is equal to 1, only the timer queue still holds ownership, + // so there is no need to take any action. +} + +struct RuntimeFilterTimerQueue { + constexpr static int64_t interval = 50; + void start() { + while (true) { + std::unique_lock<std::mutex> lk(cv_m); + + cv.wait(lk, [this] { return !_que.empty(); }); + { + std::unique_lock<std::mutex> lc(_que_lock); + std::list<std::shared_ptr<RuntimeFilterTimer>> new_que; + for (auto& it : _que) { + if (it.use_count() == 1) { + it->call_has_release(); + } else if (it->has_ready()) { + it->call_has_ready(); + } else { + int64_t ms_since_registration = MonotonicMillis() - it->registration_time(); + if (ms_since_registration > it->wait_time_ms()) { + it->call_timeout(); + } else { + new_que.push_back(std::move(it)); + } + } + } + new_que.swap(_que); + } + std::this_thread::sleep_for(std::chrono::milliseconds(interval)); + } + } + ~RuntimeFilterTimerQueue() { _thread.detach(); } + RuntimeFilterTimerQueue() { _thread = std::thread(&RuntimeFilterTimerQueue::start, this); } + static void push_filter_timer(std::shared_ptr<RuntimeFilterTimer> filter) { + static RuntimeFilterTimerQueue timer_que; + + timer_que.push(filter); + } + + void push(std::shared_ptr<RuntimeFilterTimer> filter) { + std::unique_lock<std::mutex> lc(_que_lock); + _que.push_back(filter); + cv.notify_all(); + } + + std::thread _thread; + std::condition_variable cv; + std::mutex cv_m; + std::mutex _que_lock; + + std::list<std::shared_ptr<RuntimeFilterTimer>> _que; +}; + +void RuntimeFilterDependency::add_filters(IRuntimeFilter* runtime_filter) { Review Comment: warning: method 'add_filters' can be made static [readability-convert-member-functions-to-static] be/src/pipeline/pipeline_x/dependency.h:249: ```diff - void add_filters(IRuntimeFilter* runtime_filter); + static void add_filters(IRuntimeFilter* runtime_filter); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org