This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new eee5e6290d1 [bugfix](be) exchange sink buffer may use wrong runtime
state (#50588)
eee5e6290d1 is described below
commit eee5e6290d14e7e998c0b8eced9861956d59f0a7
Author: yiguolei <[email protected]>
AuthorDate: Tue May 6 14:47:18 2025 +0800
[bugfix](be) exchange sink buffer may use wrong runtime state (#50588)
### What problem does this PR solve?
1. exchange sink buffer maybe shared among different pipeline. In this
case, it should use pipeline fragment ctx's runtime state.
2. In other case, it should use local runtime state.
---
be/src/pipeline/exec/exchange_sink_buffer.h | 1 +
be/src/pipeline/exec/exchange_sink_operator.cpp | 14 +++++++-------
be/src/pipeline/exec/exchange_sink_operator.h | 11 +++++++----
be/src/pipeline/pipeline_fragment_context.h | 2 --
be/test/pipeline/operator/exchange_sink_operator_test.cpp | 2 +-
5 files changed, 16 insertions(+), 14 deletions(-)
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h
b/be/src/pipeline/exec/exchange_sink_buffer.h
index 44416ef68e1..5c650cc5132 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.h
+++ b/be/src/pipeline/exec/exchange_sink_buffer.h
@@ -310,6 +310,7 @@ private:
PlanNodeId _node_id;
std::atomic<int64_t> _rpc_count = 0;
+ // The state may be from PipelineFragmentContext if it is shared among
multi instances.
RuntimeState* _state = nullptr;
QueryContext* _context = nullptr;
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index e944cb78f67..ebb8aad7ad5 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -106,7 +106,7 @@ Status ExchangeSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo& inf
_rpc_channels_num = channels.size() - local_size;
if (!_only_local_exchange) {
- _sink_buffer = p.get_sink_buffer(state->fragment_instance_id().lo);
+ _sink_buffer = p.get_sink_buffer(state,
state->fragment_instance_id().lo);
register_channels(_sink_buffer.get());
_queue_dependency = Dependency::create_shared(_parent->operator_id(),
_parent->node_id(),
"ExchangeSinkQueueDependency", true);
@@ -349,7 +349,7 @@ void ExchangeSinkOperatorX::_init_sink_buffer() {
for (auto fragment_instance_id : _fragment_instance_ids) {
ins_ids.push_back(fragment_instance_id.lo);
}
- _sink_buffer = _create_buffer(ins_ids);
+ _sink_buffer = _create_buffer(_state, ins_ids);
}
template <typename ChannelPtrType>
@@ -572,11 +572,11 @@ Status ExchangeSinkLocalState::close(RuntimeState* state,
Status exec_status) {
}
std::shared_ptr<ExchangeSinkBuffer> ExchangeSinkOperatorX::_create_buffer(
- const std::vector<InstanceLoId>& sender_ins_ids) {
+ RuntimeState* state, const std::vector<InstanceLoId>& sender_ins_ids) {
PUniqueId id;
id.set_hi(_state->query_id().hi);
id.set_lo(_state->query_id().lo);
- auto sink_buffer = std::make_unique<ExchangeSinkBuffer>(id, _dest_node_id,
_node_id, state(),
+ auto sink_buffer = std::make_unique<ExchangeSinkBuffer>(id, _dest_node_id,
_node_id, state,
sender_ins_ids);
for (const auto& _dest : _dests) {
sink_buffer->construct_request(_dest.fragment_instance_id);
@@ -590,17 +590,17 @@ std::shared_ptr<ExchangeSinkBuffer>
ExchangeSinkOperatorX::_create_buffer(
// (Note: This does not reduce the total number of RPCs.)
// In a merge sort scenario, there are only n RPCs, so a shared sink buffer is
not needed.
std::shared_ptr<ExchangeSinkBuffer> ExchangeSinkOperatorX::get_sink_buffer(
- InstanceLoId sender_ins_id) {
+ RuntimeState* state, InstanceLoId sender_ins_id) {
// When the child is SortSourceOperatorX or LocalExchangeSourceOperatorX,
// it is an order-by scenario.
// In this case, there is only one target instance, and no n * n RPC
concurrency will occur.
// Therefore, sharing a sink buffer is not necessary.
if (_dest_is_merge) {
- return _create_buffer({sender_ins_id});
+ return _create_buffer(state, {sender_ins_id});
}
if (_state->enable_shared_exchange_sink_buffer()) {
return _sink_buffer;
}
- return _create_buffer({sender_ins_id});
+ return _create_buffer(state, {sender_ins_id});
}
} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h
b/be/src/pipeline/exec/exchange_sink_operator.h
index cdef5e5e119..b6a22e1a8aa 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -189,8 +189,10 @@ public:
const std::vector<TUniqueId>& fragment_instance_ids);
Status init(const TDataSink& tsink) override;
- RuntimeState* state() { return _state; }
-
+ // The state is from pipeline fragment context, it will be saved in
ExchangeSinkOperator
+ // and it will be passed to exchange sink buffer. So that exchange sink
buffer should not
+ // be used after pipeline fragment ctx. All operations in Exchange Sink
Buffer should hold
+ // TaskExecutionContext.
Status prepare(RuntimeState* state) override;
Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos)
override;
@@ -217,7 +219,8 @@ public:
// Therefore, a shared sink buffer is used here to limit the number of
concurrent RPCs.
// (Note: This does not reduce the total number of RPCs.)
// In a merge sort scenario, there are only n RPCs, so a shared sink
buffer is not needed.
- std::shared_ptr<ExchangeSinkBuffer> get_sink_buffer(InstanceLoId
sender_ins_id);
+ std::shared_ptr<ExchangeSinkBuffer> get_sink_buffer(RuntimeState* state,
+ InstanceLoId
sender_ins_id);
vectorized::VExprContextSPtrs& tablet_sink_expr_ctxs() { return
_tablet_sink_expr_ctxs; }
private:
@@ -232,7 +235,7 @@ private:
// The sink buffer can be shared among multiple ExchangeSinkLocalState
instances,
// or each ExchangeSinkLocalState can have its own sink buffer.
std::shared_ptr<ExchangeSinkBuffer> _create_buffer(
- const std::vector<InstanceLoId>& sender_ins_ids);
+ RuntimeState* state, const std::vector<InstanceLoId>&
sender_ins_ids);
std::shared_ptr<ExchangeSinkBuffer> _sink_buffer = nullptr;
RuntimeState* _state = nullptr;
diff --git a/be/src/pipeline/pipeline_fragment_context.h
b/be/src/pipeline/pipeline_fragment_context.h
index 8750ce470d9..8bcbffa6ff9 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -82,8 +82,6 @@ public:
PipelinePtr add_pipeline(PipelinePtr parent = nullptr, int idx = -1);
- RuntimeState* get_runtime_state() { return _runtime_state.get(); }
-
QueryContext* get_query_ctx() { return _query_ctx.get(); }
[[nodiscard]] bool is_canceled() const { return
_query_ctx->is_cancelled(); }
diff --git a/be/test/pipeline/operator/exchange_sink_operator_test.cpp
b/be/test/pipeline/operator/exchange_sink_operator_test.cpp
index aa3fc6f7877..27fa6523e98 100644
--- a/be/test/pipeline/operator/exchange_sink_operator_test.cpp
+++ b/be/test/pipeline/operator/exchange_sink_operator_test.cpp
@@ -61,7 +61,7 @@ struct MockExchangeSinkOperatorX : public
ExchangeSinkOperatorX {
void _init_sink_buffer() override {
std::vector<InstanceLoId> ins_ids {fragment_instance_id.lo};
- _sink_buffer = _create_buffer(ins_ids);
+ _sink_buffer = _create_buffer(_state, ins_ids);
}
};
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]