github-actions[bot] commented on code in PR #26078:
URL: https://github.com/apache/doris/pull/26078#discussion_r1375397388


##########
be/src/pipeline/pipeline_x/dependency.h:
##########
@@ -19,11 +19,16 @@
 
 #include <sqltypes.h>

Review Comment:
   warning: 'sqltypes.h' file not found [clang-diagnostic-error]
   ```cpp
   #include <sqltypes.h>
            ^
   ```
   



##########
be/src/pipeline/pipeline_x/dependency.h:
##########
@@ -194,30 +199,133 @@
     const int _node_id;
 };
 
-class FilterDependency final : public Dependency {
+struct RuntimeFilterTimerQueue {
+    constexpr static int64_t interval = 10;
+    ~RuntimeFilterTimerQueue() { _thread.detach(); }
+    RuntimeFilterTimerQueue() { _thread = 
std::thread(&RuntimeFilterTimerQueue::start, this); }
+    void start() {
+        while (true) {
+            std::unique_lock<std::mutex> lk(cv_m);
+
+            cv.wait(lk, [this] { return !_que.empty(); });
+
+            std::list<std::unique_ptr<TimerItem>> new_que;
+            for (auto& it : _que) {
+                if (!it->_has_ready) {
+                    int64_t ms_since_registration = MonotonicMillis() - 
it->_registration_time;
+                    if (ms_since_registration > it->_wait_times_ms) {
+                        it->_call_back();
+                    } else {
+                        new_que.push_back(std::move(it));
+                    }
+                }
+            }
+            new_que.swap(_que);
+            std::this_thread::sleep_for(std::chrono::milliseconds(interval));
+        }
+    }
+
+    static std::atomic_bool* push_filter_timer(int64_t registration_time, 
int64_t wait_times_ms,
+                                               std::function<void()> 
call_back) {
+        static RuntimeFilterTimerQueue que;
+
+        return que.push(registration_time, wait_times_ms, call_back);
+    }
+    std::atomic_bool* push(int64_t registration_time, int64_t wait_times_ms,
+                           std::function<void()> call_back) {
+        std::unique_ptr<TimerItem> item =
+                std::make_unique<TimerItem>(call_back, registration_time, 
wait_times_ms);
+        auto* ready_ptr = &item->_has_ready;
+        std::unique_lock<std::mutex> lc(cv_m);
+        _que.push_back(std::move(item));
+        cv.notify_all();
+        return ready_ptr;
+    }
+    std::thread _thread;
+    std::condition_variable cv;
+    std::mutex cv_m;
+
+    struct TimerItem {
+        TimerItem(std::function<void()> call_back, int64_t registration_time, 
int64_t wait_times_ms)
+                : _call_back(std::move(call_back)),
+                  _has_ready(false),
+                  _registration_time(registration_time),
+                  _wait_times_ms(wait_times_ms) {}
+        std::function<void()> _call_back;
+        std::atomic_bool _has_ready;
+        const int64_t _registration_time;
+        const int64_t _wait_times_ms;
+    };
+
+    std::list<std::unique_ptr<TimerItem>> _que;
+};
+class RuntimeFilterDependency;
+class FilterDependency {
+public:
+    FilterDependency(int64_t registration_time, int32_t wait_time_ms,
+                     RuntimeFilterDependency* parent)
+            : _parent(parent) {
+        auto call_back = [&]() { call_timeout_or_ready(); };
+
+        _hash_ready = 
RuntimeFilterTimerQueue::push_filter_timer(registration_time, wait_time_ms,
+                                                                 call_back);
+    }
+
+    void set_filter_ready() {
+        *_hash_ready = true;
+        call_timeout_or_ready();
+    }
+
+    void call_timeout_or_ready();
+
+private:
+    std::atomic_bool* _hash_ready;
+    bool _has_call {};
+    RuntimeFilterDependency* _parent;
+    std::mutex _lock;
+};
+class RuntimeFilterDependency final : public Dependency {
 public:
-    FilterDependency(int id, int node_id, std::string name)
-            : Dependency(id, name),
-              _runtime_filters_are_ready_or_timeout(nullptr),
-              _node_id(node_id) {}
+    RuntimeFilterDependency(int id, int node_id, std::string name)
+            : Dependency(id, name), _node_id(node_id) {}
 
-    FilterDependency* filter_blocked_by() {
+    RuntimeFilterDependency* filter_blocked_by() {
         if (!_runtime_filters_are_ready_or_timeout) {
             return nullptr;
         }
-        if (!_runtime_filters_are_ready_or_timeout()) {
-            return this;
+        if (_filters == 0) {
+            return nullptr;
         }
-        return nullptr;
+        return this;
     }
     void* shared_state() override { return nullptr; }
-    void set_filter_blocked_by_fn(std::function<bool()> call_fn) {
-        _runtime_filters_are_ready_or_timeout = call_fn;
+    void add_filters(IRuntimeFilter* runtime_filter) {

Review Comment:
   warning: method 'add_filters' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
       static void add_filters(IRuntimeFilter* runtime_filter) {
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to