github-actions[bot] commented on code in PR #26078:
URL: https://github.com/apache/doris/pull/26078#discussion_r1375933165


##########
be/src/pipeline/pipeline_x/dependency.cpp:
##########
@@ -326,4 +330,114 @@ Status 
HashJoinDependency::extract_join_column(vectorized::Block& block,
     return Status::OK();
 }
 
+bool RuntimeFilterTimer::has_ready() {
+    std::unique_lock<std::mutex> lc(_lock);
+    return _runtime_filter->is_ready();
+}
+
+void RuntimeFilterTimer::call_timeout() {
+    std::unique_lock<std::mutex> lc(_lock);
+    if (_call_ready) {
+        return;
+    }
+    _call_timeout = true;
+    if (_parent) {
+        _parent->sub_filters();
+    }
+}
+
+void RuntimeFilterTimer::call_ready() {
+    std::unique_lock<std::mutex> lc(_lock);
+    if (_call_timeout) {
+        return;
+    }
+    _call_ready = true;
+    if (_parent) {
+        _parent->sub_filters();
+    }
+}
+
+void RuntimeFilterTimer::call_has_ready() {
+    std::unique_lock<std::mutex> lc(_lock);
+    DCHECK(!_call_timeout);
+    if (!_call_ready) {
+        _parent->sub_filters();
+    }
+}
+
+void RuntimeFilterTimer::call_has_release() {
+    // When the use count is equal to 1, only the timer queue still holds 
ownership,
+    // so there is no need to take any action.
+}
+
+struct RuntimeFilterTimerQueue {
+    constexpr static int64_t interval = 50;
+    void start() {
+        while (true) {
+            std::unique_lock<std::mutex> lk(cv_m);
+
+            cv.wait(lk, [this] { return !_que.empty(); });
+            {
+                std::unique_lock<std::mutex> lc(_que_lock);
+                std::list<std::shared_ptr<RuntimeFilterTimer>> new_que;
+                for (auto& it : _que) {
+                    if (it.use_count() == 1) {
+                        it->call_has_release();
+                    } else if (it->has_ready()) {
+                        it->call_has_ready();
+                    } else {
+                        int64_t ms_since_registration = MonotonicMillis() - 
it->registration_time();
+                        if (ms_since_registration > it->wait_time_ms()) {
+                            it->call_timeout();
+                        } else {
+                            new_que.push_back(std::move(it));
+                        }
+                    }
+                }
+                new_que.swap(_que);
+            }
+            std::this_thread::sleep_for(std::chrono::milliseconds(interval));
+        }
+    }
+    ~RuntimeFilterTimerQueue() { _thread.detach(); }
+    RuntimeFilterTimerQueue() { _thread = 
std::thread(&RuntimeFilterTimerQueue::start, this); }
+    static void push_filter_timer(std::shared_ptr<RuntimeFilterTimer> filter) {
+        static RuntimeFilterTimerQueue timer_que;
+
+        timer_que.push(filter);
+    }
+
+    void push(std::shared_ptr<RuntimeFilterTimer> filter) {
+        std::unique_lock<std::mutex> lc(_que_lock);
+        _que.push_back(filter);
+        cv.notify_all();
+    }
+
+    std::thread _thread;
+    std::condition_variable cv;
+    std::mutex cv_m;
+    std::mutex _que_lock;
+
+    std::list<std::shared_ptr<RuntimeFilterTimer>> _que;
+};
+
+void RuntimeFilterDependency::add_filters(IRuntimeFilter* runtime_filter) {

Review Comment:
   warning: method 'add_filters' can be made static 
[readability-convert-member-functions-to-static]
   
   be/src/pipeline/pipeline_x/dependency.h:249:
   ```diff
   -     void add_filters(IRuntimeFilter* runtime_filter);
   +     static void add_filters(IRuntimeFilter* runtime_filter);
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to