This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new aa75f79fad [fix](executor)cancel exchange buffer rpc when query is 
cancelled (#22226)
aa75f79fad is described below

commit aa75f79fad87ac845bb7552fd81ad83df6ab4759
Author: wangbo <wan...@apache.org>
AuthorDate: Thu Jul 27 14:38:25 2023 +0800

    [fix](executor)cancel exchange buffer rpc when query is cancelled (#22226)
    
    when brpc client make a request to a server, if the server doesn't response 
and may not response forever(such as BE restart), the query can be cancelled at 
once, but the ExchangeSinkBuffer can not be cancelled until rpc timeout.
    So we hope when the query is cancelled, the ExchangeSinkBuffer can be 
closed at once.
---
 be/src/pipeline/exec/exchange_sink_buffer.cpp | 29 +++++++++++++++++++++++++--
 be/src/pipeline/exec/exchange_sink_buffer.h   |  9 ++++++++-
 2 files changed, 35 insertions(+), 3 deletions(-)

diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp 
b/be/src/pipeline/exec/exchange_sink_buffer.cpp
index e06bb24752..0a487922a2 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.cpp
+++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp
@@ -22,7 +22,6 @@
 #include <butil/iobuf_inl.h>
 #include <fmt/format.h>
 #include <gen_cpp/Types_types.h>
-#include <gen_cpp/internal_service.pb.h>
 #include <glog/logging.h>
 #include <google/protobuf/stubs/callback.h>
 #include <stddef.h>
@@ -72,11 +71,25 @@ bool ExchangeSinkBuffer::can_write() const {
     return total_package_size <= max_package_size;
 }
 
-bool ExchangeSinkBuffer::is_pending_finish() const {
+bool ExchangeSinkBuffer::is_pending_finish() {
+    //note(wb) angly implementation here, because operator couples the 
scheduling logic
+    // graceful implementation maybe as follows:
+    // 1 make ExchangeSinkBuffer support try close which calls 
brpc::StartCancel
+    // 2 make BlockScheduler calls tryclose when query is cancel
+    bool need_cancel = _context->is_canceled();
+
     for (auto& pair : _instance_to_package_queue_mutex) {
         std::unique_lock<std::mutex> lock(*(pair.second));
         auto& id = pair.first;
         if (!_instance_to_sending_by_pipeline.at(id)) {
+            // when pending finish, we need check whether current query is 
cancelled
+            if (need_cancel && _instance_to_rpc_ctx.find(id) != 
_instance_to_rpc_ctx.end()) {
+                auto& rpc_ctx = _instance_to_rpc_ctx[id];
+                if (!rpc_ctx.is_cancelled) {
+                    brpc::StartCancel(rpc_ctx._closure->cntl.call_id());
+                    rpc_ctx.is_cancelled = true;
+                }
+            }
             return true;
         }
     }
@@ -177,6 +190,12 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
             brpc_request->set_allocated_block(request.block.get());
         }
         auto* closure = request.channel->get_closure(id, request.eos, nullptr);
+
+        ExchangeRpcContext rpc_ctx;
+        rpc_ctx._closure = closure;
+        rpc_ctx.is_cancelled = false;
+        _instance_to_rpc_ctx[id] = rpc_ctx;
+
         closure->cntl.set_timeout_ms(request.channel->_brpc_timeout_ms);
         closure->addFailedHandler(
                 [&](const InstanceLoId& id, const std::string& err) { 
_failed(id, err); });
@@ -221,6 +240,12 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
             
brpc_request->set_allocated_block(request.block_holder->get_block());
         }
         auto* closure = request.channel->get_closure(id, request.eos, 
request.block_holder);
+
+        ExchangeRpcContext rpc_ctx;
+        rpc_ctx._closure = closure;
+        rpc_ctx.is_cancelled = false;
+        _instance_to_rpc_ctx[id] = rpc_ctx;
+
         closure->cntl.set_timeout_ms(request.channel->_brpc_timeout_ms);
         closure->addFailedHandler(
                 [&](const InstanceLoId& id, const std::string& err) { 
_failed(id, err); });
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h 
b/be/src/pipeline/exec/exchange_sink_buffer.h
index dcea246f91..c463656310 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.h
+++ b/be/src/pipeline/exec/exchange_sink_buffer.h
@@ -19,6 +19,7 @@
 
 #include <brpc/controller.h>
 #include <gen_cpp/data.pb.h>
+#include <gen_cpp/internal_service.pb.h>
 #include <gen_cpp/types.pb.h>
 #include <parallel_hashmap/phmap.h>
 #include <stdint.h>
@@ -153,6 +154,11 @@ private:
     vectorized::BroadcastPBlockHolder* _data;
 };
 
+struct ExchangeRpcContext {
+    SelfDeleteClosure<PTransmitDataResult>* _closure = nullptr;
+    bool is_cancelled = false;
+};
+
 // Each ExchangeSinkOperator have one ExchangeSinkBuffer
 class ExchangeSinkBuffer {
 public:
@@ -162,7 +168,7 @@ public:
     Status add_block(TransmitInfo&& request);
     Status add_block(BroadcastTransmitInfo&& request);
     bool can_write() const;
-    bool is_pending_finish() const;
+    bool is_pending_finish();
     void close();
     void set_rpc_time(InstanceLoId id, int64_t start_rpc_time, int64_t 
receive_rpc_time);
     void update_profile(RuntimeProfile* profile);
@@ -185,6 +191,7 @@ private:
     phmap::flat_hash_map<InstanceLoId, bool> _instance_to_sending_by_pipeline;
     phmap::flat_hash_map<InstanceLoId, bool> _instance_to_receiver_eof;
     phmap::flat_hash_map<InstanceLoId, int64_t> _instance_to_rpc_time;
+    phmap::flat_hash_map<InstanceLoId, ExchangeRpcContext> 
_instance_to_rpc_ctx;
 
     std::atomic<bool> _is_finishing;
     PUniqueId _query_id;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to