This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 6a430910e6e [fix](shuffle) Do not return error if local recvr is null
(#35329)
6a430910e6e is described below
commit 6a430910e6e5e3a1cb6dbd33133b37edb84dd2f5
Author: Gabriel <[email protected]>
AuthorDate: Sun May 26 11:44:46 2024 +0800
[fix](shuffle) Do not return error if local recvr is null (#35329)
---
be/src/pipeline/exec/exchange_sink_operator.cpp | 17 +++++++++++------
be/src/vec/sink/vdata_stream_sender.cpp | 6 +-----
2 files changed, 12 insertions(+), 11 deletions(-)
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index 69cd139714f..244184bc7a3 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -155,12 +155,17 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
size_t dep_id = 0;
for (auto* channel : channels) {
if (channel->is_local()) {
-
_local_channels_dependency.push_back(channel->get_local_channel_dependency());
- DCHECK(_local_channels_dependency[dep_id] != nullptr);
- _wait_channel_timer.push_back(_profile->add_nonzero_counter(
- fmt::format("WaitForLocalExchangeBuffer{}", dep_id),
TUnit ::TIME_NS,
- timer_name, 1));
- dep_id++;
+ if (auto dep = channel->get_local_channel_dependency()) {
+ _local_channels_dependency.push_back(dep);
+ DCHECK(_local_channels_dependency[dep_id] != nullptr);
+
_wait_channel_timer.push_back(_profile->add_nonzero_counter(
+ fmt::format("WaitForLocalExchangeBuffer{}",
dep_id), TUnit ::TIME_NS,
+ timer_name, 1));
+ dep_id++;
+ } else {
+ LOG(WARNING) << "local recvr is null: query id = "
+ << print_id(state->query_id()) << " node id =
" << p.node_id();
+ }
}
}
}
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp
b/be/src/vec/sink/vdata_stream_sender.cpp
index 307e84e9290..63f2aa19515 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -178,11 +178,7 @@ Status Channel<Parent>::open(RuntimeState* state) {
std::shared_ptr<pipeline::Dependency>
PipChannel::get_local_channel_dependency() {
if (!Channel<pipeline::ExchangeSinkLocalState>::_local_recvr) {
- throw Exception(
- ErrorCode::INTERNAL_ERROR,
- "_local_recvr is null: " +
-
std::to_string(Channel<pipeline::ExchangeSinkLocalState>::_parent->parent()
- ->node_id()));
+ return nullptr;
}
return
Channel<pipeline::ExchangeSinkLocalState>::_local_recvr->get_local_channel_dependency(
Channel<pipeline::ExchangeSinkLocalState>::_parent->sender_id());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]