This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new a11a590368d [fix](pipeline) Fix query hang up if limited rows is
reached (#35513)
a11a590368d is described below
commit a11a590368da90975cc8195731f2ac8660f6a747
Author: Gabriel <[email protected]>
AuthorDate: Fri May 31 21:55:46 2024 +0800
[fix](pipeline) Fix query hang up if limited rows is reached (#35513)
Follow-up for #35466.
We should assure closed tasks will not block other tasks.
---
be/src/pipeline/exec/exchange_sink_buffer.cpp | 41 +++++++++++++---------
be/src/pipeline/exec/exchange_sink_buffer.h | 11 +++---
be/src/pipeline/exec/exchange_sink_operator.cpp | 9 +++--
be/src/pipeline/exec/exchange_sink_operator.h | 3 ++
be/src/pipeline/exec/operator.h | 8 +++++
.../local_exchange_source_operator.cpp | 8 +++++
be/src/pipeline/local_exchange/local_exchanger.cpp | 25 +++++++++++++
be/src/pipeline/local_exchange/local_exchanger.h | 4 +++
be/src/pipeline/pipeline_fragment_context.h | 8 +++++
be/src/pipeline/pipeline_task.cpp | 12 ++++---
be/src/pipeline/pipeline_task.h | 6 ++++
be/src/runtime/fragment_mgr.cpp | 3 ++
12 files changed, 109 insertions(+), 29 deletions(-)
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp
b/be/src/pipeline/exec/exchange_sink_buffer.cpp
index d66e5ebd680..8893db54cc5 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.cpp
+++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp
@@ -84,7 +84,8 @@ std::shared_ptr<BroadcastPBlockHolder>
BroadcastPBlockHolderQueue::pop() {
namespace pipeline {
ExchangeSinkBuffer::ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId
dest_node_id, int send_id,
- int be_number, RuntimeState* state)
+ int be_number, RuntimeState* state,
+ ExchangeSinkLocalState* parent)
: HasTaskExecutionCtx(state),
_queue_capacity(0),
_is_finishing(false),
@@ -93,9 +94,8 @@ ExchangeSinkBuffer::ExchangeSinkBuffer(PUniqueId query_id,
PlanNodeId dest_node_
_sender_id(send_id),
_be_number(be_number),
_state(state),
- _context(state->get_query_ctx()) {}
-
-ExchangeSinkBuffer::~ExchangeSinkBuffer() = default;
+ _context(state->get_query_ctx()),
+ _parent(parent) {}
void ExchangeSinkBuffer::close() {
// Could not clear the queue here, because there maybe a running rpc want
to
@@ -213,8 +213,7 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
_instance_to_broadcast_package_queue[id];
if (_is_finishing) {
- _rpc_channel_is_idle[id] = true;
- _set_ready_to_finish(_busy_channels.fetch_sub(1) == 1);
+ _turn_off_channel(id);
return Status::OK();
}
@@ -372,8 +371,7 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
}
broadcast_q.pop();
} else {
- _rpc_channel_is_idle[id] = true;
- _set_ready_to_finish(_busy_channels.fetch_sub(1) == 1);
+ _turn_off_channel(id);
}
return Status::OK();
@@ -403,26 +401,21 @@ void ExchangeSinkBuffer::_ended(InstanceLoId id) {
__builtin_unreachable();
} else {
std::unique_lock<std::mutex>
lock(*_instance_to_package_queue_mutex[id]);
- if (!_rpc_channel_is_idle[id]) {
- _rpc_channel_is_idle[id] = true;
- _set_ready_to_finish(_busy_channels.fetch_sub(1) == 1);
- }
+ _turn_off_channel(id);
}
}
void ExchangeSinkBuffer::_failed(InstanceLoId id, const std::string& err) {
_is_finishing = true;
_context->cancel(Status::Cancelled(err));
- _ended(id);
+ std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[id]);
+ _turn_off_channel(id, true);
}
void ExchangeSinkBuffer::_set_receiver_eof(InstanceLoId id) {
std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[id]);
_instance_to_receiver_eof[id] = true;
- if (!_rpc_channel_is_idle[id]) {
- _rpc_channel_is_idle[id] = true;
- _set_ready_to_finish(_busy_channels.fetch_sub(1) == 1);
- }
+ _turn_off_channel(id, true);
std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>> empty;
swap(empty, _instance_to_broadcast_package_queue[id]);
}
@@ -432,6 +425,20 @@ bool ExchangeSinkBuffer::_is_receiver_eof(InstanceLoId id)
{
return _instance_to_receiver_eof[id];
}
+void ExchangeSinkBuffer::_turn_off_channel(InstanceLoId id, bool cleanup) {
+ if (!_rpc_channel_is_idle[id]) {
+ _rpc_channel_is_idle[id] = true;
+ auto all_done = _busy_channels.fetch_sub(1) == 1;
+ _set_ready_to_finish(all_done);
+ if (cleanup && all_done) {
+ auto weak_task_ctx = weak_task_exec_ctx();
+ if (auto pip_ctx = weak_task_ctx.lock()) {
+ _parent->set_reach_limit();
+ }
+ }
+ }
+}
+
void ExchangeSinkBuffer::get_max_min_rpc_time(int64_t* max_time, int64_t*
min_time) {
int64_t local_max_time = 0;
int64_t local_min_time = INT64_MAX;
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h
b/be/src/pipeline/exec/exchange_sink_buffer.h
index 2955ff959de..8eed559e712 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.h
+++ b/be/src/pipeline/exec/exchange_sink_buffer.h
@@ -47,6 +47,7 @@ using InstanceLoId = int64_t;
namespace pipeline {
class Dependency;
+class ExchangeSinkLocalState;
} // namespace pipeline
namespace vectorized {
@@ -192,11 +193,11 @@ struct ExchangeRpcContext {
};
// Each ExchangeSinkOperator have one ExchangeSinkBuffer
-class ExchangeSinkBuffer : public HasTaskExecutionCtx {
+class ExchangeSinkBuffer final : public HasTaskExecutionCtx {
public:
ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_id, int
send_id, int be_number,
- RuntimeState* state);
- ~ExchangeSinkBuffer();
+ RuntimeState* state, ExchangeSinkLocalState* parent);
+ ~ExchangeSinkBuffer() = default;
void register_sink(TUniqueId);
Status add_block(TransmitInfo&& request);
@@ -265,6 +266,7 @@ private:
inline void _failed(InstanceLoId id, const std::string& err);
inline void _set_receiver_eof(InstanceLoId id);
inline bool _is_receiver_eof(InstanceLoId id);
+ inline void _turn_off_channel(InstanceLoId id, bool cleanup = false);
void get_max_min_rpc_time(int64_t* max_time, int64_t* min_time);
int64_t get_sum_rpc_time();
@@ -272,7 +274,8 @@ private:
std::shared_ptr<Dependency> _queue_dependency = nullptr;
std::shared_ptr<Dependency> _finish_dependency = nullptr;
std::shared_ptr<Dependency> _broadcast_dependency = nullptr;
- std::atomic<bool> _should_stop {false};
+ std::atomic<bool> _should_stop = false;
+ ExchangeSinkLocalState* _parent = nullptr;
};
} // namespace pipeline
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index 244184bc7a3..67994c85cfa 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -132,7 +132,7 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
id.set_hi(_state->query_id().hi);
id.set_lo(_state->query_id().lo);
_sink_buffer = std::make_unique<ExchangeSinkBuffer>(id, p._dest_node_id,
_sender_id,
- _state->be_number(),
state);
+ _state->be_number(),
state, this);
register_channels(_sink_buffer.get());
_queue_dependency = Dependency::create_shared(_parent->operator_id(),
_parent->node_id(),
@@ -624,8 +624,11 @@ Status ExchangeSinkOperatorX::channel_add_rows_with_idx(
std::string ExchangeSinkLocalState::debug_string(int indentation_level) const {
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer, "{}",
Base::debug_string(indentation_level));
- fmt::format_to(debug_string_buffer, ", Sink Buffer: (_should_stop = {},
_busy_channels = {})",
- _sink_buffer->_should_stop.load(),
_sink_buffer->_busy_channels.load());
+ fmt::format_to(debug_string_buffer,
+ ", Sink Buffer: (_should_stop = {}, _busy_channels = {},
_is_finishing = {}), "
+ "_reach_limit: {}",
+ _sink_buffer->_should_stop.load(),
_sink_buffer->_busy_channels.load(),
+ _sink_buffer->_is_finishing.load(), _reach_limit.load());
return fmt::to_string(debug_string_buffer);
}
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h
b/be/src/pipeline/exec/exchange_sink_operator.h
index 72fdbc3354d..fa72db6702f 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -100,6 +100,8 @@ public:
RuntimeProfile::Counter* compress_timer() { return _compress_timer; }
RuntimeProfile::Counter* uncompressed_bytes_counter() { return
_uncompressed_bytes_counter; }
[[nodiscard]] bool transfer_large_data_by_brpc() const;
+ bool is_finished() const override { return _reach_limit.load(); }
+ void set_reach_limit() { _reach_limit = true; };
[[nodiscard]] int sender_id() const { return _sender_id; }
@@ -199,6 +201,7 @@ private:
// for external table sink hash partition
std::unique_ptr<HashPartitionFunction> _partition_function = nullptr;
+ std::atomic<bool> _reach_limit = false;
};
class ExchangeSinkOperatorX final : public
DataSinkOperatorX<ExchangeSinkLocalState> {
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index a24a58f883b..4eef9589a23 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -312,6 +312,7 @@ public:
// idempotent (e.g. wait for runtime filters).
virtual Status open(RuntimeState* state) = 0;
virtual Status close(RuntimeState* state, Status exec_status) = 0;
+ [[nodiscard]] virtual bool is_finished() const { return false; }
[[nodiscard]] virtual std::string debug_string(int indentation_level)
const = 0;
@@ -445,6 +446,13 @@ public:
Status prepare(RuntimeState* state) override { return Status::OK(); }
Status open(RuntimeState* state) override { return Status::OK(); }
+ [[nodiscard]] bool is_finished(RuntimeState* state) const {
+ auto result = state->get_sink_local_state_result();
+ if (!result) {
+ return result.error();
+ }
+ return result.value()->is_finished();
+ }
[[nodiscard]] virtual Status sink(RuntimeState* state, vectorized::Block*
block, bool eos) = 0;
diff --git a/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp
b/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp
index ea9f7861dbf..1c8dff51a29 100644
--- a/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp
+++ b/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp
@@ -52,6 +52,9 @@ Status LocalExchangeSourceLocalState::close(RuntimeState*
state) {
return Status::OK();
}
+ if (_exchanger) {
+ _exchanger->close(*this);
+ }
if (_shared_state) {
_shared_state->sub_running_source_operators();
}
@@ -67,6 +70,11 @@ std::string LocalExchangeSourceLocalState::debug_string(int
indentation_level) c
Base::debug_string(indentation_level), _channel_id,
_exchanger->_num_partitions,
_exchanger->_num_senders, _exchanger->_num_sources,
_exchanger->_running_sink_operators,
_exchanger->_running_source_operators);
+ size_t i = 0;
+ fmt::format_to(debug_string_buffer, ", MemTrackers: ");
+ for (auto* mem_tracker : _shared_state->mem_trackers) {
+ fmt::format_to(debug_string_buffer, "{}: {}, ", i,
mem_tracker->consumption());
+ }
return fmt::to_string(debug_string_buffer);
}
diff --git a/be/src/pipeline/local_exchange/local_exchanger.cpp
b/be/src/pipeline/local_exchange/local_exchanger.cpp
index 97310948307..1c03b28415d 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.cpp
+++ b/be/src/pipeline/local_exchange/local_exchanger.cpp
@@ -39,6 +39,16 @@ Status ShuffleExchanger::sink(RuntimeState* state,
vectorized::Block* in_block,
return Status::OK();
}
+void ShuffleExchanger::close(LocalExchangeSourceLocalState& local_state) {
+ PartitionedBlock partitioned_block;
+ while
(_data_queue[local_state._channel_id].try_dequeue(partitioned_block)) {
+ auto block_wrapper = partitioned_block.first;
+ local_state._shared_state->sub_mem_usage(
+ local_state._channel_id,
block_wrapper->data_block.allocated_bytes(), false);
+ block_wrapper->unref(local_state._shared_state);
+ }
+}
+
Status ShuffleExchanger::get_block(RuntimeState* state, vectorized::Block*
block, bool* eos,
LocalExchangeSourceLocalState& local_state)
{
PartitionedBlock partitioned_block;
@@ -182,6 +192,14 @@ Status PassthroughExchanger::sink(RuntimeState* state,
vectorized::Block* in_blo
return Status::OK();
}
+void PassthroughExchanger::close(LocalExchangeSourceLocalState& local_state) {
+ vectorized::Block next_block;
+ while (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
+ local_state._shared_state->sub_mem_usage(local_state._channel_id,
+ next_block.allocated_bytes());
+ }
+}
+
Status PassthroughExchanger::get_block(RuntimeState* state, vectorized::Block*
block, bool* eos,
LocalExchangeSourceLocalState&
local_state) {
vectorized::Block next_block;
@@ -255,6 +273,13 @@ Status BroadcastExchanger::sink(RuntimeState* state,
vectorized::Block* in_block
return Status::OK();
}
+void BroadcastExchanger::close(LocalExchangeSourceLocalState& local_state) {
+ vectorized::Block next_block;
+ while (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
+ // do nothing
+ }
+}
+
Status BroadcastExchanger::get_block(RuntimeState* state, vectorized::Block*
block, bool* eos,
LocalExchangeSourceLocalState&
local_state) {
vectorized::Block next_block;
diff --git a/be/src/pipeline/local_exchange/local_exchanger.h
b/be/src/pipeline/local_exchange/local_exchanger.h
index 476f479e11e..bc07c806094 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.h
+++ b/be/src/pipeline/local_exchange/local_exchanger.h
@@ -48,6 +48,7 @@ public:
virtual Status sink(RuntimeState* state, vectorized::Block* in_block, bool
eos,
LocalExchangeSinkLocalState& local_state) = 0;
virtual ExchangeType get_type() const = 0;
+ virtual void close(LocalExchangeSourceLocalState& local_state) {}
protected:
friend struct LocalExchangeSharedState;
@@ -108,6 +109,7 @@ public:
Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos,
LocalExchangeSourceLocalState& local_state) override;
+ void close(LocalExchangeSourceLocalState& local_state) override;
ExchangeType get_type() const override { return
ExchangeType::HASH_SHUFFLE; }
protected:
@@ -150,6 +152,7 @@ public:
Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos,
LocalExchangeSourceLocalState& local_state) override;
ExchangeType get_type() const override { return ExchangeType::PASSTHROUGH;
}
+ void close(LocalExchangeSourceLocalState& local_state) override;
private:
std::vector<moodycamel::ConcurrentQueue<vectorized::Block>> _data_queue;
@@ -188,6 +191,7 @@ public:
Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos,
LocalExchangeSourceLocalState& local_state) override;
ExchangeType get_type() const override { return ExchangeType::BROADCAST; }
+ void close(LocalExchangeSourceLocalState& local_state) override;
private:
std::vector<moodycamel::ConcurrentQueue<vectorized::Block>> _data_queue;
diff --git a/be/src/pipeline/pipeline_fragment_context.h
b/be/src/pipeline/pipeline_fragment_context.h
index 8bc1eb29139..94dd96731c2 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -131,6 +131,14 @@ public:
}
}
+ void clear_finished_tasks() {
+ for (size_t j = 0; j < _tasks.size(); j++) {
+ for (size_t i = 0; i < _tasks[j].size(); i++) {
+ _tasks[j][i]->stop_if_finished();
+ }
+ }
+ };
+
private:
Status _build_pipelines(ObjectPool* pool, const
doris::TPipelineFragmentParams& request,
const DescriptorTbl& descs, OperatorXPtr* root,
PipelinePtr cur_pipe);
diff --git a/be/src/pipeline/pipeline_task.cpp
b/be/src/pipeline/pipeline_task.cpp
index d26b0fce387..52a76828804 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -273,6 +273,7 @@ Status PipelineTask::execute(bool* eos) {
SCOPED_TIMER(_task_profile->total_time_counter());
SCOPED_TIMER(_exec_timer);
SCOPED_ATTACH_TASK(_state);
+ _eos = _sink->is_finished(_state) || _eos;
*eos = _eos;
if (_eos) {
// If task is waken up by finish dependency, `_eos` is set to true by
last execution, and we should return here.
@@ -334,14 +335,15 @@ Status PipelineTask::execute(bool* eos) {
Status::Error<INTERNAL_ERROR>("fault_inject pipeline_task
executing failed");
return status;
});
- // Pull block from operator chain
- if (!_dry_run) {
+ // `_dry_run` means sink operator need no more data
+ // `_sink->is_finished(_state)` means sink operator should be finished
+ if (_dry_run || _sink->is_finished(_state)) {
+ *eos = true;
+ _eos = true;
+ } else {
SCOPED_TIMER(_get_block_timer);
_get_block_counter->update(1);
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_root->get_block_after_projects(_state,
block, eos));
- } else {
- *eos = true;
- _eos = true;
}
if (_block->rows() != 0 || *eos) {
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index bb6587eec28..6bc65905be6 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -222,6 +222,12 @@ public:
std::string task_name() const { return fmt::format("task{}({})", _index,
_pipeline->_name); }
+ void stop_if_finished() {
+ if (_sink->is_finished(_state)) {
+ clear_blocking_state();
+ }
+ }
+
private:
friend class RuntimeFilterDependency;
bool _is_blocked();
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 852179e98e9..69fff2951ef 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -1046,6 +1046,9 @@ void FragmentMgr::cancel_worker() {
to_cancel.push_back(fragment_instance_itr.second->fragment_instance_id());
}
}
+ for (auto& pipeline_itr : _pipeline_map) {
+ pipeline_itr.second->clear_finished_tasks();
+ }
for (auto it = _query_ctx_map.begin(); it !=
_query_ctx_map.end();) {
if (auto q_ctx = it->second.lock()) {
if (q_ctx->is_timeout(now) &&
q_ctx->enable_pipeline_x_exec()) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]