This is an automated email from the ASF dual-hosted git repository. panxiaolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new aa75f79fad [fix](executor)cancel exchange buffer rpc when query is cancelled (#22226) aa75f79fad is described below commit aa75f79fad87ac845bb7552fd81ad83df6ab4759 Author: wangbo <wan...@apache.org> AuthorDate: Thu Jul 27 14:38:25 2023 +0800 [fix](executor)cancel exchange buffer rpc when query is cancelled (#22226) when brpc client make a request to a server, if the server doesn't response and may not response forever(such as BE restart), the query can be cancelled at once, but the ExchangeSinkBuffer can not be cancelled until rpc timeout. So we hope when the query is cancelled, the ExchangeSinkBuffer can be closed at once. --- be/src/pipeline/exec/exchange_sink_buffer.cpp | 29 +++++++++++++++++++++++++-- be/src/pipeline/exec/exchange_sink_buffer.h | 9 ++++++++- 2 files changed, 35 insertions(+), 3 deletions(-) diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp index e06bb24752..0a487922a2 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.cpp +++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp @@ -22,7 +22,6 @@ #include <butil/iobuf_inl.h> #include <fmt/format.h> #include <gen_cpp/Types_types.h> -#include <gen_cpp/internal_service.pb.h> #include <glog/logging.h> #include <google/protobuf/stubs/callback.h> #include <stddef.h> @@ -72,11 +71,25 @@ bool ExchangeSinkBuffer::can_write() const { return total_package_size <= max_package_size; } -bool ExchangeSinkBuffer::is_pending_finish() const { +bool ExchangeSinkBuffer::is_pending_finish() { + //note(wb) angly implementation here, because operator couples the scheduling logic + // graceful implementation maybe as follows: + // 1 make ExchangeSinkBuffer support try close which calls brpc::StartCancel + // 2 make BlockScheduler calls tryclose when query is cancel + bool need_cancel = _context->is_canceled(); + for (auto& pair : _instance_to_package_queue_mutex) { std::unique_lock<std::mutex> lock(*(pair.second)); auto& id = pair.first; if (!_instance_to_sending_by_pipeline.at(id)) { + // when pending finish, we need check whether current query is cancelled + if (need_cancel && _instance_to_rpc_ctx.find(id) != _instance_to_rpc_ctx.end()) { + auto& rpc_ctx = _instance_to_rpc_ctx[id]; + if (!rpc_ctx.is_cancelled) { + brpc::StartCancel(rpc_ctx._closure->cntl.call_id()); + rpc_ctx.is_cancelled = true; + } + } return true; } } @@ -177,6 +190,12 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { brpc_request->set_allocated_block(request.block.get()); } auto* closure = request.channel->get_closure(id, request.eos, nullptr); + + ExchangeRpcContext rpc_ctx; + rpc_ctx._closure = closure; + rpc_ctx.is_cancelled = false; + _instance_to_rpc_ctx[id] = rpc_ctx; + closure->cntl.set_timeout_ms(request.channel->_brpc_timeout_ms); closure->addFailedHandler( [&](const InstanceLoId& id, const std::string& err) { _failed(id, err); }); @@ -221,6 +240,12 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { brpc_request->set_allocated_block(request.block_holder->get_block()); } auto* closure = request.channel->get_closure(id, request.eos, request.block_holder); + + ExchangeRpcContext rpc_ctx; + rpc_ctx._closure = closure; + rpc_ctx.is_cancelled = false; + _instance_to_rpc_ctx[id] = rpc_ctx; + closure->cntl.set_timeout_ms(request.channel->_brpc_timeout_ms); closure->addFailedHandler( [&](const InstanceLoId& id, const std::string& err) { _failed(id, err); }); diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h b/be/src/pipeline/exec/exchange_sink_buffer.h index dcea246f91..c463656310 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.h +++ b/be/src/pipeline/exec/exchange_sink_buffer.h @@ -19,6 +19,7 @@ #include <brpc/controller.h> #include <gen_cpp/data.pb.h> +#include <gen_cpp/internal_service.pb.h> #include <gen_cpp/types.pb.h> #include <parallel_hashmap/phmap.h> #include <stdint.h> @@ -153,6 +154,11 @@ private: vectorized::BroadcastPBlockHolder* _data; }; +struct ExchangeRpcContext { + SelfDeleteClosure<PTransmitDataResult>* _closure = nullptr; + bool is_cancelled = false; +}; + // Each ExchangeSinkOperator have one ExchangeSinkBuffer class ExchangeSinkBuffer { public: @@ -162,7 +168,7 @@ public: Status add_block(TransmitInfo&& request); Status add_block(BroadcastTransmitInfo&& request); bool can_write() const; - bool is_pending_finish() const; + bool is_pending_finish(); void close(); void set_rpc_time(InstanceLoId id, int64_t start_rpc_time, int64_t receive_rpc_time); void update_profile(RuntimeProfile* profile); @@ -185,6 +191,7 @@ private: phmap::flat_hash_map<InstanceLoId, bool> _instance_to_sending_by_pipeline; phmap::flat_hash_map<InstanceLoId, bool> _instance_to_receiver_eof; phmap::flat_hash_map<InstanceLoId, int64_t> _instance_to_rpc_time; + phmap::flat_hash_map<InstanceLoId, ExchangeRpcContext> _instance_to_rpc_ctx; std::atomic<bool> _is_finishing; PUniqueId _query_id; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org