This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 95591ce49a [refactor](cv)wait on condition variable more gently 
(#12620)
95591ce49a is described below

commit 95591ce49a3215d3223748ed219b2441481b4678
Author: starocean999 <40539150+starocean...@users.noreply.github.com>
AuthorDate: Tue Nov 8 08:40:31 2022 +0800

    [refactor](cv)wait on condition variable more gently (#12620)
---
 be/src/agent/task_worker_pool.cpp        |  85 ++++++++-----------
 be/src/util/blocking_priority_queue.hpp  | 135 ++++++++++++++-----------------
 be/src/util/blocking_queue.hpp           |  58 +++++--------
 be/src/util/threadpool.cpp               |  17 ++--
 be/src/vec/exec/scan/scanner_context.cpp |  15 ++--
 5 files changed, 127 insertions(+), 183 deletions(-)

diff --git a/be/src/agent/task_worker_pool.cpp 
b/be/src/agent/task_worker_pool.cpp
index 5e482ef90f..3f5ca98021 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -341,9 +341,8 @@ void 
TaskWorkerPool::_create_tablet_worker_thread_callback() {
         TCreateTabletReq create_tablet_req;
         {
             std::unique_lock<std::mutex> 
worker_thread_lock(_worker_thread_lock);
-            while (_is_work && _tasks.empty()) {
-                _worker_thread_condition_variable.wait(worker_thread_lock);
-            }
+            _worker_thread_condition_variable.wait(
+                    worker_thread_lock, [this]() { return !_is_work || 
!_tasks.empty(); });
             if (!_is_work) {
                 return;
             }
@@ -416,9 +415,8 @@ void TaskWorkerPool::_drop_tablet_worker_thread_callback() {
         TDropTabletReq drop_tablet_req;
         {
             std::unique_lock<std::mutex> 
worker_thread_lock(_worker_thread_lock);
-            while (_is_work && _tasks.empty()) {
-                _worker_thread_condition_variable.wait(worker_thread_lock);
-            }
+            _worker_thread_condition_variable.wait(
+                    worker_thread_lock, [this]() { return !_is_work || 
!_tasks.empty(); });
             if (!_is_work) {
                 return;
             }
@@ -469,9 +467,8 @@ void TaskWorkerPool::_alter_tablet_worker_thread_callback() 
{
         TAgentTaskRequest agent_task_req;
         {
             std::unique_lock<std::mutex> 
worker_thread_lock(_worker_thread_lock);
-            while (_is_work && _tasks.empty()) {
-                _worker_thread_condition_variable.wait(worker_thread_lock);
-            }
+            _worker_thread_condition_variable.wait(
+                    worker_thread_lock, [this]() { return !_is_work || 
!_tasks.empty(); });
             if (!_is_work) {
                 return;
             }
@@ -592,9 +589,8 @@ void TaskWorkerPool::_push_worker_thread_callback() {
         int32_t index = 0;
         do {
             std::unique_lock<std::mutex> 
worker_thread_lock(_worker_thread_lock);
-            while (_is_work && _tasks.empty()) {
-                _worker_thread_condition_variable.wait(worker_thread_lock);
-            }
+            _worker_thread_condition_variable.wait(
+                    worker_thread_lock, [this]() { return !_is_work || 
!_tasks.empty(); });
             if (!_is_work) {
                 return;
             }
@@ -664,9 +660,8 @@ void 
TaskWorkerPool::_publish_version_worker_thread_callback() {
         TPublishVersionRequest publish_version_req;
         {
             std::unique_lock<std::mutex> 
worker_thread_lock(_worker_thread_lock);
-            while (_is_work && _tasks.empty()) {
-                _worker_thread_condition_variable.wait(worker_thread_lock);
-            }
+            _worker_thread_condition_variable.wait(
+                    worker_thread_lock, [this]() { return !_is_work || 
!_tasks.empty(); });
             if (!_is_work) {
                 return;
             }
@@ -773,9 +768,8 @@ void 
TaskWorkerPool::_clear_transaction_task_worker_thread_callback() {
         TClearTransactionTaskRequest clear_transaction_task_req;
         {
             std::unique_lock<std::mutex> 
worker_thread_lock(_worker_thread_lock);
-            while (_is_work && _tasks.empty()) {
-                _worker_thread_condition_variable.wait(worker_thread_lock);
-            }
+            _worker_thread_condition_variable.wait(
+                    worker_thread_lock, [this]() { return !_is_work || 
!_tasks.empty(); });
             if (!_is_work) {
                 return;
             }
@@ -826,9 +820,8 @@ void 
TaskWorkerPool::_update_tablet_meta_worker_thread_callback() {
         TUpdateTabletMetaInfoReq update_tablet_meta_req;
         {
             std::unique_lock<std::mutex> 
worker_thread_lock(_worker_thread_lock);
-            while (_is_work && _tasks.empty()) {
-                _worker_thread_condition_variable.wait(worker_thread_lock);
-            }
+            _worker_thread_condition_variable.wait(
+                    worker_thread_lock, [this]() { return !_is_work || 
!_tasks.empty(); });
             if (!_is_work) {
                 return;
             }
@@ -902,9 +895,8 @@ void TaskWorkerPool::_clone_worker_thread_callback() {
 
         {
             std::unique_lock<std::mutex> 
worker_thread_lock(_worker_thread_lock);
-            while (_is_work && _tasks.empty()) {
-                _worker_thread_condition_variable.wait(worker_thread_lock);
-            }
+            _worker_thread_condition_variable.wait(
+                    worker_thread_lock, [this]() { return !_is_work || 
!_tasks.empty(); });
             if (!_is_work) {
                 return;
             }
@@ -952,9 +944,8 @@ void 
TaskWorkerPool::_storage_medium_migrate_worker_thread_callback() {
         TStorageMediumMigrateReq storage_medium_migrate_req;
         {
             std::unique_lock<std::mutex> 
worker_thread_lock(_worker_thread_lock);
-            while (_is_work && _tasks.empty()) {
-                _worker_thread_condition_variable.wait(worker_thread_lock);
-            }
+            _worker_thread_condition_variable.wait(
+                    worker_thread_lock, [this]() { return !_is_work || 
!_tasks.empty(); });
             if (!_is_work) {
                 return;
             }
@@ -1054,9 +1045,8 @@ void 
TaskWorkerPool::_check_consistency_worker_thread_callback() {
         TCheckConsistencyReq check_consistency_req;
         {
             std::unique_lock<std::mutex> 
worker_thread_lock(_worker_thread_lock);
-            while (_is_work && _tasks.empty()) {
-                _worker_thread_condition_variable.wait(worker_thread_lock);
-            }
+            _worker_thread_condition_variable.wait(
+                    worker_thread_lock, [this]() { return !_is_work || 
!_tasks.empty(); });
             if (!_is_work) {
                 return;
             }
@@ -1248,9 +1238,8 @@ void TaskWorkerPool::_upload_worker_thread_callback() {
         TUploadReq upload_request;
         {
             std::unique_lock<std::mutex> 
worker_thread_lock(_worker_thread_lock);
-            while (_is_work && _tasks.empty()) {
-                _worker_thread_condition_variable.wait(worker_thread_lock);
-            }
+            _worker_thread_condition_variable.wait(
+                    worker_thread_lock, [this]() { return !_is_work || 
!_tasks.empty(); });
             if (!_is_work) {
                 return;
             }
@@ -1300,9 +1289,8 @@ void TaskWorkerPool::_download_worker_thread_callback() {
         TDownloadReq download_request;
         {
             std::unique_lock<std::mutex> 
worker_thread_lock(_worker_thread_lock);
-            while (_is_work && _tasks.empty()) {
-                _worker_thread_condition_variable.wait(worker_thread_lock);
-            }
+            _worker_thread_condition_variable.wait(
+                    worker_thread_lock, [this]() { return !_is_work || 
!_tasks.empty(); });
             if (!_is_work) {
                 return;
             }
@@ -1353,9 +1341,8 @@ void TaskWorkerPool::_make_snapshot_thread_callback() {
         TSnapshotRequest snapshot_request;
         {
             std::unique_lock<std::mutex> 
worker_thread_lock(_worker_thread_lock);
-            while (_is_work && _tasks.empty()) {
-                _worker_thread_condition_variable.wait(worker_thread_lock);
-            }
+            _worker_thread_condition_variable.wait(
+                    worker_thread_lock, [this]() { return !_is_work || 
!_tasks.empty(); });
             if (!_is_work) {
                 return;
             }
@@ -1413,9 +1400,8 @@ void TaskWorkerPool::_release_snapshot_thread_callback() {
         TReleaseSnapshotRequest release_snapshot_request;
         {
             std::unique_lock<std::mutex> 
worker_thread_lock(_worker_thread_lock);
-            while (_is_work && _tasks.empty()) {
-                _worker_thread_condition_variable.wait(worker_thread_lock);
-            }
+            _worker_thread_condition_variable.wait(
+                    worker_thread_lock, [this]() { return !_is_work || 
!_tasks.empty(); });
             if (!_is_work) {
                 return;
             }
@@ -1463,9 +1449,8 @@ void TaskWorkerPool::_move_dir_thread_callback() {
         TMoveDirReq move_dir_req;
         {
             std::unique_lock<std::mutex> 
worker_thread_lock(_worker_thread_lock);
-            while (_is_work && _tasks.empty()) {
-                _worker_thread_condition_variable.wait(worker_thread_lock);
-            }
+            _worker_thread_condition_variable.wait(
+                    worker_thread_lock, [this]() { return !_is_work || 
!_tasks.empty(); });
             if (!_is_work) {
                 return;
             }
@@ -1571,9 +1556,8 @@ void 
TaskWorkerPool::_submit_table_compaction_worker_thread_callback() {
 
         {
             std::unique_lock<std::mutex> 
worker_thread_lock(_worker_thread_lock);
-            while (_is_work && _tasks.empty()) {
-                _worker_thread_condition_variable.wait(worker_thread_lock);
-            }
+            _worker_thread_condition_variable.wait(
+                    worker_thread_lock, [this]() { return !_is_work || 
!_tasks.empty(); });
             if (!_is_work) {
                 return;
             }
@@ -1673,9 +1657,8 @@ void 
TaskWorkerPool::_storage_update_storage_policy_worker_thread_callback() {
         TGetStoragePolicy get_storage_policy_req;
         {
             std::unique_lock<std::mutex> 
worker_thread_lock(_worker_thread_lock);
-            while (_is_work && _tasks.empty()) {
-                _worker_thread_condition_variable.wait(worker_thread_lock);
-            }
+            _worker_thread_condition_variable.wait(
+                    worker_thread_lock, [this]() { return !_is_work || 
!_tasks.empty(); });
             if (!_is_work) {
                 return;
             }
diff --git a/be/src/util/blocking_priority_queue.hpp 
b/be/src/util/blocking_priority_queue.hpp
index 8c264e57dd..29060613c8 100644
--- a/be/src/util/blocking_priority_queue.hpp
+++ b/be/src/util/blocking_priority_queue.hpp
@@ -49,101 +49,90 @@ public:
     // -- timeout_ms: 0 means wait indefinitely
     bool blocking_get(T* out, uint32_t timeout_ms = 0) {
         MonotonicStopWatch timer;
+        timer.start();
         std::unique_lock<std::mutex> unique_lock(_lock);
-
-        while (true) {
-            if (!_queue.empty()) {
-                // 定期提高队列中残留的任务优先级
-                // 保证优先级较低的大查询不至于完全饿死
-                if (_upgrade_counter > 
config::priority_queue_remaining_tasks_increased_frequency) {
-                    std::priority_queue<T> tmp_queue;
-                    while (!_queue.empty()) {
-                        T v = _queue.top();
-                        _queue.pop();
-                        ++v;
-                        tmp_queue.push(v);
-                    }
-                    swap(_queue, tmp_queue);
-                    _upgrade_counter = 0;
+        bool wait_successful = false;
+        if (timeout_ms > 0) {
+            wait_successful = _get_cv.wait_for(unique_lock, 
std::chrono::milliseconds(timeout_ms),
+                                               [this] { return _shutdown || 
!_queue.empty(); });
+        } else {
+            _get_cv.wait(unique_lock, [this] { return _shutdown || 
!_queue.empty(); });
+            wait_successful = true;
+        }
+        _total_get_wait_time += timer.elapsed_time();
+        if (wait_successful) {
+            if (_upgrade_counter > 
config::priority_queue_remaining_tasks_increased_frequency) {
+                std::priority_queue<T> tmp_queue;
+                while (!_queue.empty()) {
+                    T v = _queue.top();
+                    _queue.pop();
+                    ++v;
+                    tmp_queue.push(v);
                 }
+                swap(_queue, tmp_queue);
+                _upgrade_counter = 0;
+            }
+            if (!_queue.empty()) {
                 *out = _queue.top();
                 _queue.pop();
                 ++_upgrade_counter;
-                _total_get_wait_time += timer.elapsed_time();
-                unique_lock.unlock();
                 _put_cv.notify_one();
                 return true;
-            }
-            if (_shutdown) {
-                return false;
-            }
-
-            timer.start();
-            if (timeout_ms != 0) {
-                if (_get_cv.wait_for(unique_lock, 
std::chrono::milliseconds(timeout_ms)) ==
-                    std::cv_status::timeout) {
-                    return false;
-                }
             } else {
-                _get_cv.wait(unique_lock);
+                assert(_shutdown);
+                return false;
             }
-            timer.stop();
+        } else {
+            //time out
+            assert(!_shutdown);
+            return false;
         }
     }
 
     bool non_blocking_get(T* out) {
         MonotonicStopWatch timer;
+        timer.start();
         std::unique_lock<std::mutex> unique_lock(_lock);
 
-        while (true) {
-            if (!_queue.empty()) {
-                // 定期提高队列中残留的任务优先级
-                // 保证优先级较低的大查询不至于完全饿死
-                if (_upgrade_counter > 
config::priority_queue_remaining_tasks_increased_frequency) {
-                    std::priority_queue<T> tmp_queue;
-                    while (!_queue.empty()) {
-                        T v = _queue.top();
-                        _queue.pop();
-                        ++v;
-                        tmp_queue.push(v);
-                    }
-                    swap(_queue, tmp_queue);
-                    _upgrade_counter = 0;
+        if (!_queue.empty()) {
+            // 定期提高队列中残留的任务优先级
+            // 保证优先级较低的大查询不至于完全饿死
+            if (_upgrade_counter > 
config::priority_queue_remaining_tasks_increased_frequency) {
+                std::priority_queue<T> tmp_queue;
+                while (!_queue.empty()) {
+                    T v = _queue.top();
+                    _queue.pop();
+                    ++v;
+                    tmp_queue.push(v);
                 }
-                *out = _queue.top();
-                _queue.pop();
-                ++_upgrade_counter;
-                _total_get_wait_time += timer.elapsed_time();
-                unique_lock.unlock();
-                _put_cv.notify_one();
-                return true;
+                swap(_queue, tmp_queue);
+                _upgrade_counter = 0;
             }
-            if (_shutdown) {
-                return false;
-            }
-            return false;
+            *out = _queue.top();
+            _queue.pop();
+            ++_upgrade_counter;
+            _total_get_wait_time += timer.elapsed_time();
+            _put_cv.notify_one();
+            return true;
         }
+
+        return false;
     }
 
     // Puts an element into the queue, waiting indefinitely until there is 
space.
     // If the queue is shut down, returns false.
     bool blocking_put(const T& val) {
         MonotonicStopWatch timer;
+        timer.start();
         std::unique_lock<std::mutex> unique_lock(_lock);
-
-        while (_queue.size() >= _max_element && !_shutdown) {
-            timer.start();
-            _put_cv.wait(unique_lock);
-            timer.stop();
-        }
+        _put_cv.wait(unique_lock, [this] { return _shutdown || _queue.size() < 
_max_element; });
         _total_put_wait_time += timer.elapsed_time();
+
         if (_shutdown) {
             return false;
         }
 
-        DCHECK_LT(_queue.size(), _max_element);
         _queue.push(val);
-        unique_lock.unlock();
         _get_cv.notify_one();
         return true;
     }
@@ -151,7 +140,7 @@ public:
     // Shut down the queue. Wakes up all threads waiting on blocking_get or 
blocking_put.
     void shutdown() {
         {
-            std::unique_lock<std::mutex> l(_lock);
+            std::lock_guard<std::mutex> l(_lock);
             _shutdown = true;
         }
         _get_cv.notify_all();
@@ -159,24 +148,18 @@ public:
     }
 
     uint32_t get_size() const {
-        std::unique_lock<std::mutex> l(_lock);
+        std::lock_guard<std::mutex> l(_lock);
         return _queue.size();
     }
 
     // Returns the total amount of time threads have blocked in blocking_get.
-    uint64_t total_get_wait_time() const {
-        std::lock_guard<std::mutex> guard(_lock);
-        return _total_get_wait_time;
-    }
+    uint64_t total_get_wait_time() const { return _total_get_wait_time; }
 
     // Returns the total amount of time threads have blocked in blocking_put.
-    uint64_t total_put_wait_time() const {
-        std::lock_guard<std::mutex> guard(_lock);
-        return _total_put_wait_time;
-    }
+    uint64_t total_put_wait_time() const { return _total_put_wait_time; }
 
 private:
-    std::atomic<bool> _shutdown;
+    bool _shutdown;
     const int _max_element;
     std::condition_variable _get_cv; // 'get' callers wait on this
     std::condition_variable _put_cv; // 'put' callers wait on this
@@ -184,8 +167,8 @@ private:
     mutable std::mutex _lock;
     std::priority_queue<T> _queue;
     int _upgrade_counter;
-    uint64_t _total_get_wait_time;
-    uint64_t _total_put_wait_time;
+    std::atomic<uint64_t> _total_get_wait_time;
+    std::atomic<uint64_t> _total_put_wait_time;
 };
 
 } // namespace doris
diff --git a/be/src/util/blocking_queue.hpp b/be/src/util/blocking_queue.hpp
index 7ba33c37f1..dff6911edf 100644
--- a/be/src/util/blocking_queue.hpp
+++ b/be/src/util/blocking_queue.hpp
@@ -22,6 +22,7 @@
 
 #include <unistd.h>
 
+#include <atomic>
 #include <condition_variable>
 #include <list>
 #include <mutex>
@@ -47,25 +48,19 @@ public:
     // are no more elements available.
     bool blocking_get(T* out) {
         MonotonicStopWatch timer;
+        timer.start();
         std::unique_lock<std::mutex> unique_lock(_lock);
-
-        while (true) {
-            if (!_list.empty()) {
-                *out = _list.front();
-                _list.pop_front();
-                _total_get_wait_time += timer.elapsed_time();
-                unique_lock.unlock();
-                _put_cv.notify_one();
-                return true;
-            }
-
-            if (_shutdown) {
-                return false;
-            }
-
-            timer.start();
-            _get_cv.wait(unique_lock);
-            timer.stop();
+        _get_cv.wait(unique_lock, [this] { return _shutdown || !_list.empty(); 
});
+        _total_get_wait_time += timer.elapsed_time();
+
+        if (!_list.empty()) {
+            *out = _list.front();
+            _list.pop_front();
+            _put_cv.notify_one();
+            return true;
+        } else {
+            assert(_shutdown);
+            return false;
         }
     }
 
@@ -104,23 +99,16 @@ public:
     // If the queue is shut down, returns false.
     bool blocking_put(const T& val) {
         MonotonicStopWatch timer;
+        timer.start();
         std::unique_lock<std::mutex> unique_lock(_lock);
-
-        while (_list.size() >= _max_elements && !_shutdown) {
-            timer.start();
-            _put_cv.wait(unique_lock);
-            timer.stop();
-        }
-
+        _put_cv.wait(unique_lock, [this] { return _shutdown || _list.size() < 
_max_elements; });
         _total_put_wait_time += timer.elapsed_time();
 
         if (_shutdown) {
             return false;
         }
 
-        DCHECK_LT(_list.size(), _max_elements);
         _list.push_back(val);
-        unique_lock.unlock();
         _get_cv.notify_one();
         return true;
     }
@@ -137,21 +125,15 @@ public:
     }
 
     uint32_t get_size() const {
-        std::unique_lock<std::mutex> l(_lock);
+        std::lock_guard<std::mutex> l(_lock);
         return _list.size();
     }
 
     // Returns the total amount of time threads have blocked in BlockingGet.
-    uint64_t total_get_wait_time() const {
-        std::lock_guard<std::mutex> guard(_lock);
-        return _total_get_wait_time;
-    }
+    uint64_t total_get_wait_time() const { return _total_get_wait_time; }
 
     // Returns the total amount of time threads have blocked in BlockingPut.
-    uint64_t total_put_wait_time() const {
-        std::lock_guard<std::mutex> guard(_lock);
-        return _total_put_wait_time;
-    }
+    uint64_t total_put_wait_time() const { return _total_put_wait_time; }
 
 private:
     uint32_t SizeLocked(const std::unique_lock<std::mutex>& lock) const {
@@ -167,8 +149,8 @@ private:
     // _lock guards access to _list, total_get_wait_time, and 
total_put_wait_time
     mutable std::mutex _lock;
     std::list<T> _list;
-    uint64_t _total_get_wait_time;
-    uint64_t _total_put_wait_time;
+    std::atomic<uint64_t> _total_get_wait_time;
+    std::atomic<uint64_t> _total_put_wait_time;
 };
 
 } // namespace doris
diff --git a/be/src/util/threadpool.cpp b/be/src/util/threadpool.cpp
index 881984ba38..76bc7e2171 100644
--- a/be/src/util/threadpool.cpp
+++ b/be/src/util/threadpool.cpp
@@ -148,9 +148,7 @@ void ThreadPoolToken::shutdown() {
     case State::QUIESCING:
         // The token is already quiescing. Just wait for a worker thread to
         // switch it to QUIESCED.
-        while (state() != State::QUIESCED) {
-            _not_running_cond.wait(l);
-        }
+        _not_running_cond.wait(l, [this]() { return state() == 
State::QUIESCED; });
         break;
     default:
         break;
@@ -160,9 +158,7 @@ void ThreadPoolToken::shutdown() {
 void ThreadPoolToken::wait() {
     std::unique_lock<std::mutex> l(_pool->_lock);
     _pool->check_not_pool_thread_unlocked();
-    while (is_active()) {
-        _not_running_cond.wait(l);
-    }
+    _not_running_cond.wait(l, [this]() { return !is_active(); });
 }
 
 void ThreadPoolToken::transition(State new_state) {
@@ -320,9 +316,8 @@ void ThreadPool::shutdown() {
         _idle_threads.front().not_empty.notify_one();
         _idle_threads.pop_front();
     }
-    while (_num_threads + _num_threads_pending_start > 0) {
-        _no_threads_cond.wait(l);
-    }
+
+    _no_threads_cond.wait(l, [this]() { return _num_threads + 
_num_threads_pending_start == 0; });
 
     // All the threads have exited. Check the state of each token.
     for (auto* t : _tokens) {
@@ -465,9 +460,7 @@ Status ThreadPool::do_submit(std::shared_ptr<Runnable> r, 
ThreadPoolToken* token
 void ThreadPool::wait() {
     std::unique_lock<std::mutex> l(_lock);
     check_not_pool_thread_unlocked();
-    while (_total_queued_tasks > 0 || _active_threads > 0) {
-        _idle_cond.wait(l);
-    }
+    _idle_cond.wait(l, [this]() { return _total_queued_tasks == 0 && 
_active_threads == 0; });
 }
 
 void ThreadPool::dispatch_thread() {
diff --git a/be/src/vec/exec/scan/scanner_context.cpp 
b/be/src/vec/exec/scan/scanner_context.cpp
index 45e106039f..7f6cd847bb 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -116,13 +116,16 @@ Status 
ScannerContext::get_block_from_queue(vectorized::Block** block, bool* eos
         _state->exec_env()->scanner_scheduler()->submit(this);
     }
     // Wait for block from queue
-    while (_process_status.ok() && !_is_finished && blocks_queue.empty()) {
-        if (_state->is_cancelled()) {
-            _process_status = Status::Cancelled("cancelled");
-            break;
-        }
+    {
         SCOPED_TIMER(_parent->_scanner_wait_batch_timer);
-        _blocks_queue_added_cv.wait_for(l, std::chrono::seconds(1));
+        _blocks_queue_added_cv.wait(l, [this]() {
+            return !blocks_queue.empty() || _is_finished || 
!_process_status.ok() ||
+                   _state->is_cancelled();
+        });
+    }
+
+    if (_state->is_cancelled()) {
+        _process_status = Status::Cancelled("cancelled");
     }
 
     if (!_process_status.ok()) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to