This is an automated email from the ASF dual-hosted git repository.
lihaopeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 55dd58d8994 [Refactor](Exchange) do exchange sink buffer refactor
(#49335)
55dd58d8994 is described below
commit 55dd58d8994bdd28ee560ad6c2b6788ce63aafdc
Author: HappenLee <[email protected]>
AuthorDate: Thu Apr 10 22:30:52 2025 +0800
[Refactor](Exchange) do exchange sink buffer refactor (#49335)
Do exchange refactor
* remove unless check
* change muti map to one struct map, only search one time
* rename some variables to match code style
---
be/src/pipeline/exec/exchange_sink_buffer.cpp | 240 +++++++++++-------------
be/src/pipeline/exec/exchange_sink_buffer.h | 130 +++++++------
be/src/pipeline/exec/exchange_sink_operator.cpp | 32 ++--
be/src/pipeline/exec/exchange_sink_operator.h | 8 +-
be/src/vec/sink/vdata_stream_sender.cpp | 2 +-
be/src/vec/sink/vdata_stream_sender.h | 4 +-
be/test/pipeline/pipeline_test.cpp | 2 +-
be/test/vec/exec/exchange_sink_test.cpp | 58 +++---
be/test/vec/exec/exchange_sink_test.h | 6 +-
9 files changed, 241 insertions(+), 241 deletions(-)
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp
b/be/src/pipeline/exec/exchange_sink_buffer.cpp
index 90d54a35aa0..95a5a00f68a 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.cpp
+++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp
@@ -100,12 +100,7 @@ ExchangeSinkBuffer::ExchangeSinkBuffer(PUniqueId query_id,
PlanNodeId dest_node_
_node_id(node_id),
_state(state),
_context(state->get_query_ctx()),
- _exchange_sink_num(sender_ins_ids.size()) {
- for (auto sender_ins_id : sender_ins_ids) {
- _queue_deps.emplace(sender_ins_id, nullptr);
- _parents.emplace(sender_ins_id, nullptr);
- }
-}
+ _exchange_sink_num(sender_ins_ids.size()) {}
void ExchangeSinkBuffer::close() {
// Could not clear the queue here, because there maybe a running rpc want
to
@@ -121,29 +116,34 @@ void ExchangeSinkBuffer::construct_request(TUniqueId
fragment_instance_id) {
return;
}
auto low_id = fragment_instance_id.lo;
- if (_instance_to_package_queue_mutex.count(low_id)) {
+ if (_rpc_instances.contains(low_id)) {
return;
}
- _instance_to_package_queue_mutex[low_id] = std::make_unique<std::mutex>();
- _instance_to_seq[low_id] = 0;
- _instance_to_package_queue[low_id] = std::queue<TransmitInfo,
std::list<TransmitInfo>>();
- _instance_to_broadcast_package_queue[low_id] =
+
+ // Initialize the instance data
+ auto instance_data = std::make_unique<RpcInstance>(low_id);
+ instance_data->mutex = std::make_unique<std::mutex>();
+ instance_data->seq = 0;
+ instance_data->package_queue = std::queue<TransmitInfo,
std::list<TransmitInfo>>();
+ instance_data->broadcast_package_queue =
std::queue<BroadcastTransmitInfo,
std::list<BroadcastTransmitInfo>>();
- _queue_capacity =
- config::exchg_buffer_queue_capacity_factor *
_instance_to_package_queue.size();
+ _queue_capacity = config::exchg_buffer_queue_capacity_factor *
_rpc_instances.size();
+
PUniqueId finst_id;
finst_id.set_hi(fragment_instance_id.hi);
finst_id.set_lo(fragment_instance_id.lo);
- _rpc_channel_is_idle[low_id] = true;
- _rpc_channel_is_turn_off[low_id] = false;
-
_instance_to_rpc_stats_vec.emplace_back(std::make_shared<RpcInstanceStatistics>(low_id));
- _instance_to_rpc_stats[low_id] = _instance_to_rpc_stats_vec.back().get();
- _instance_to_request[low_id] = std::make_shared<PTransmitDataParams>();
- _instance_to_request[low_id]->mutable_finst_id()->CopyFrom(finst_id);
- _instance_to_request[low_id]->mutable_query_id()->CopyFrom(_query_id);
-
- _instance_to_request[low_id]->set_node_id(_dest_node_id);
- _running_sink_count[low_id] = _exchange_sink_num;
+
+ instance_data->rpc_channel_is_idle = true;
+ instance_data->rpc_channel_is_turn_off = false;
+
+ // Initialize request
+ instance_data->request = std::make_shared<PTransmitDataParams>();
+ instance_data->request->mutable_finst_id()->CopyFrom(finst_id);
+ instance_data->request->mutable_query_id()->CopyFrom(_query_id);
+ instance_data->request->set_node_id(_dest_node_id);
+ instance_data->running_sink_count = _exchange_sink_num;
+
+ _rpc_instances[low_id] = std::move(instance_data);
}
Status ExchangeSinkBuffer::add_block(TransmitInfo&& request) {
@@ -151,20 +151,21 @@ Status ExchangeSinkBuffer::add_block(TransmitInfo&&
request) {
return Status::OK();
}
auto ins_id = request.channel->dest_ins_id();
- if (!_instance_to_package_queue_mutex.contains(ins_id)) {
+ if (!_rpc_instances.contains(ins_id)) {
return Status::InternalError("fragment_instance_id {} not do
register_sink",
print_id(request.channel->_fragment_instance_id));
}
- if (_rpc_channel_is_turn_off[ins_id]) {
+ auto& instance_data = *_rpc_instances[ins_id];
+ if (instance_data.rpc_channel_is_turn_off) {
return Status::EndOfFile("receiver eof");
}
bool send_now = false;
{
- std::unique_lock<std::mutex>
lock(*_instance_to_package_queue_mutex[ins_id]);
+ std::unique_lock<std::mutex> lock(*instance_data.mutex);
// Do not have in process rpc, directly send
- if (_rpc_channel_is_idle[ins_id]) {
+ if (instance_data.rpc_channel_is_idle) {
send_now = true;
- _rpc_channel_is_idle[ins_id] = false;
+ instance_data.rpc_channel_is_idle = false;
}
if (request.block) {
RETURN_IF_ERROR(
@@ -172,16 +173,16 @@ Status ExchangeSinkBuffer::add_block(TransmitInfo&&
request) {
COUNTER_UPDATE(request.channel->_parent->memory_used_counter(),
request.block->ByteSizeLong());
}
- _instance_to_package_queue[ins_id].emplace(std::move(request));
+ instance_data.package_queue.emplace(std::move(request));
_total_queue_size++;
if (_total_queue_size > _queue_capacity) {
- for (auto& [_, dep] : _queue_deps) {
+ for (auto& dep : _queue_deps) {
dep->block();
}
}
}
if (send_now) {
- RETURN_IF_ERROR(_send_rpc(ins_id));
+ RETURN_IF_ERROR(_send_rpc(instance_data));
}
return Status::OK();
@@ -192,71 +193,69 @@ Status
ExchangeSinkBuffer::add_block(BroadcastTransmitInfo&& request) {
return Status::OK();
}
auto ins_id = request.channel->dest_ins_id();
- if (!_instance_to_package_queue_mutex.contains(ins_id)) {
+ if (!_rpc_instances.contains(ins_id)) {
return Status::InternalError("fragment_instance_id {} not do
register_sink",
print_id(request.channel->_fragment_instance_id));
}
- if (_rpc_channel_is_turn_off[ins_id]) {
+ auto& instance_data = *_rpc_instances[ins_id];
+ if (instance_data.rpc_channel_is_turn_off) {
return Status::EndOfFile("receiver eof");
}
bool send_now = false;
{
- std::unique_lock<std::mutex>
lock(*_instance_to_package_queue_mutex[ins_id]);
+ std::unique_lock<std::mutex> lock(*instance_data.mutex);
// Do not have in process rpc, directly send
- if (_rpc_channel_is_idle[ins_id]) {
+ if (instance_data.rpc_channel_is_idle) {
send_now = true;
- _rpc_channel_is_idle[ins_id] = false;
+ instance_data.rpc_channel_is_idle = false;
}
if (request.block_holder->get_block()) {
RETURN_IF_ERROR(BeExecVersionManager::check_be_exec_version(
request.block_holder->get_block()->be_exec_version()));
}
- _instance_to_broadcast_package_queue[ins_id].emplace(request);
+ instance_data.broadcast_package_queue.emplace(request);
}
if (send_now) {
- RETURN_IF_ERROR(_send_rpc(ins_id));
+ RETURN_IF_ERROR(_send_rpc(instance_data));
}
return Status::OK();
}
-Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
- std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[id]);
+Status ExchangeSinkBuffer::_send_rpc(RpcInstance& instance_data) {
+ std::unique_lock<std::mutex> lock(*(instance_data.mutex));
- std::queue<TransmitInfo, std::list<TransmitInfo>>& q =
_instance_to_package_queue[id];
+ std::queue<TransmitInfo, std::list<TransmitInfo>>& q =
instance_data.package_queue;
std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>>&
broadcast_q =
- _instance_to_broadcast_package_queue[id];
+ instance_data.broadcast_package_queue;
if (_is_failed) {
- _turn_off_channel(id, lock);
+ _turn_off_channel(instance_data, lock);
return Status::OK();
}
- if (_rpc_channel_is_turn_off[id]) {
+ if (instance_data.rpc_channel_is_turn_off) {
return Status::OK();
}
if (!q.empty()) {
// If we have data to shuffle which is not broadcasted
auto& request = q.front();
- auto& brpc_request = _instance_to_request[id];
+ auto& brpc_request = instance_data.request;
brpc_request->set_eos(request.eos);
- brpc_request->set_packet_seq(_instance_to_seq[id]++);
+ brpc_request->set_packet_seq(instance_data.seq++);
brpc_request->set_sender_id(request.channel->_parent->sender_id());
brpc_request->set_be_number(request.channel->_parent->be_number());
if (request.block && !request.block->column_metas().empty()) {
brpc_request->set_allocated_block(request.block.get());
}
- if (!request.exec_status.ok()) {
-
request.exec_status.to_protobuf(brpc_request->mutable_exec_status());
- }
- auto send_callback = request.channel->get_send_callback(id,
request.eos);
+ auto send_callback =
request.channel->get_send_callback(&instance_data, request.eos);
send_callback->cntl_->set_timeout_ms(request.channel->_brpc_timeout_ms);
if (config::execution_ignore_eovercrowded) {
send_callback->cntl_->ignore_eovercrowded();
}
send_callback->addFailedHandler([&, weak_task_ctx =
weak_task_exec_ctx()](
- const InstanceLoId& id, const
std::string& err) {
+ RpcInstance* ins, const
std::string& err) {
auto task_lock = weak_task_ctx.lock();
if (task_lock == nullptr) {
// This means ExchangeSinkBuffer Ojbect already destroyed, not
need run failed any more.
@@ -264,11 +263,11 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
}
// attach task for memory tracker and query id when core
SCOPED_ATTACH_TASK(_state);
- _failed(id, err);
+ _failed(ins->id, err);
});
send_callback->start_rpc_time = GetCurrentTimeNanos();
send_callback->addSuccessHandler([&, weak_task_ctx =
weak_task_exec_ctx()](
- const InstanceLoId& id, const
bool& eos,
+ RpcInstance* ins_ptr, const
bool& eos,
const PTransmitDataResult&
result,
const int64_t&
start_rpc_time) {
auto task_lock = weak_task_ctx.lock();
@@ -279,24 +278,25 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
// attach task for memory tracker and query id when core
SCOPED_ATTACH_TASK(_state);
+ auto& ins = *ins_ptr;
auto end_rpc_time = GetCurrentTimeNanos();
- update_rpc_time(id, start_rpc_time, end_rpc_time);
+ update_rpc_time(ins, start_rpc_time, end_rpc_time);
Status s(Status::create(result.status()));
if (s.is<ErrorCode::END_OF_FILE>()) {
- _set_receiver_eof(id);
+ _set_receiver_eof(ins);
} else if (!s.ok()) {
- _failed(id,
+ _failed(ins.id,
fmt::format("exchange req success but status isn't ok:
{}", s.to_string()));
return;
} else if (eos) {
- _ended(id);
+ _ended(ins);
}
// The eos here only indicates that the current exchange sink has
reached eos.
// However, the queue still contains data from other exchange
sinks, so RPCs need to continue being sent.
- s = _send_rpc(id);
+ s = _send_rpc(ins);
if (!s) {
- _failed(id,
+ _failed(ins.id,
fmt::format("exchange req success but status isn't ok:
{}", s.to_string()));
}
});
@@ -322,29 +322,29 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
q.pop();
_total_queue_size--;
if (_total_queue_size <= _queue_capacity) {
- for (auto& [_, dep] : _queue_deps) {
+ for (auto& dep : _queue_deps) {
dep->set_ready();
}
}
} else if (!broadcast_q.empty()) {
// If we have data to shuffle which is broadcasted
auto& request = broadcast_q.front();
- auto& brpc_request = _instance_to_request[id];
+ auto& brpc_request = instance_data.request;
brpc_request->set_eos(request.eos);
- brpc_request->set_packet_seq(_instance_to_seq[id]++);
+ brpc_request->set_packet_seq(instance_data.seq++);
brpc_request->set_sender_id(request.channel->_parent->sender_id());
brpc_request->set_be_number(request.channel->_parent->be_number());
if (request.block_holder->get_block() &&
!request.block_holder->get_block()->column_metas().empty()) {
brpc_request->set_allocated_block(request.block_holder->get_block());
}
- auto send_callback = request.channel->get_send_callback(id,
request.eos);
+ auto send_callback =
request.channel->get_send_callback(&instance_data, request.eos);
send_callback->cntl_->set_timeout_ms(request.channel->_brpc_timeout_ms);
if (config::execution_ignore_eovercrowded) {
send_callback->cntl_->ignore_eovercrowded();
}
send_callback->addFailedHandler([&, weak_task_ctx =
weak_task_exec_ctx()](
- const InstanceLoId& id, const
std::string& err) {
+ RpcInstance* ins, const
std::string& err) {
auto task_lock = weak_task_ctx.lock();
if (task_lock == nullptr) {
// This means ExchangeSinkBuffer Ojbect already destroyed, not
need run failed any more.
@@ -352,11 +352,11 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
}
// attach task for memory tracker and query id when core
SCOPED_ATTACH_TASK(_state);
- _failed(id, err);
+ _failed(ins->id, err);
});
send_callback->start_rpc_time = GetCurrentTimeNanos();
send_callback->addSuccessHandler([&, weak_task_ctx =
weak_task_exec_ctx()](
- const InstanceLoId& id, const
bool& eos,
+ RpcInstance* ins_ptr, const
bool& eos,
const PTransmitDataResult&
result,
const int64_t&
start_rpc_time) {
auto task_lock = weak_task_ctx.lock();
@@ -366,26 +366,26 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
}
// attach task for memory tracker and query id when core
SCOPED_ATTACH_TASK(_state);
-
+ auto& ins = *ins_ptr;
auto end_rpc_time = GetCurrentTimeNanos();
- update_rpc_time(id, start_rpc_time, end_rpc_time);
+ update_rpc_time(ins, start_rpc_time, end_rpc_time);
Status s(Status::create(result.status()));
if (s.is<ErrorCode::END_OF_FILE>()) {
- _set_receiver_eof(id);
+ _set_receiver_eof(ins);
} else if (!s.ok()) {
- _failed(id,
+ _failed(ins.id,
fmt::format("exchange req success but status isn't ok:
{}", s.to_string()));
return;
} else if (eos) {
- _ended(id);
+ _ended(ins);
}
// The eos here only indicates that the current exchange sink has
reached eos.
// However, the queue still contains data from other exchange
sinks, so RPCs need to continue being sent.
- s = _send_rpc(id);
+ s = _send_rpc(ins);
if (!s) {
- _failed(id,
+ _failed(ins.id,
fmt::format("exchange req success but status isn't ok:
{}", s.to_string()));
}
});
@@ -408,29 +408,17 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
}
broadcast_q.pop();
} else {
- _rpc_channel_is_idle[id] = true;
+ instance_data.rpc_channel_is_idle = true;
}
return Status::OK();
}
-void ExchangeSinkBuffer::_ended(InstanceLoId id) {
- if (!_instance_to_package_queue_mutex.contains(id)) {
- std::stringstream ss;
- ss << "failed find the instance id:" << id
- << " now mutex map size:" <<
_instance_to_package_queue_mutex.size();
- for (const auto& p : _instance_to_package_queue_mutex) {
- ss << " key:" << p.first << " value:" << p.second << "\n";
- }
- LOG(INFO) << ss.str();
-
- throw Exception(Status::FatalError("not find the instance id"));
- } else {
- std::unique_lock<std::mutex>
lock(*_instance_to_package_queue_mutex[id]);
- _running_sink_count[id]--;
- if (_running_sink_count[id] == 0) {
- _turn_off_channel(id, lock);
- }
+void ExchangeSinkBuffer::_ended(RpcInstance& ins) {
+ std::unique_lock<std::mutex> lock(*ins.mutex);
+ ins.running_sink_count--;
+ if (ins.running_sink_count == 0) {
+ _turn_off_channel(ins, lock);
}
}
@@ -441,15 +429,15 @@ void ExchangeSinkBuffer::_failed(InstanceLoId id, const
std::string& err) {
_context->cancel(Status::Cancelled(err));
}
-void ExchangeSinkBuffer::_set_receiver_eof(InstanceLoId id) {
- std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[id]);
+void ExchangeSinkBuffer::_set_receiver_eof(RpcInstance& ins) {
+ std::unique_lock<std::mutex> lock(*ins.mutex);
// When the receiving side reaches eof, it means the receiver has finished
early.
// The remaining data in the current rpc_channel does not need to be sent,
// and the rpc_channel should be turned off immediately.
- Defer turn_off([&]() { _turn_off_channel(id, lock); });
+ Defer turn_off([&]() { _turn_off_channel(ins, lock); });
std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>>&
broadcast_q =
- _instance_to_broadcast_package_queue[id];
+ ins.broadcast_package_queue;
for (; !broadcast_q.empty(); broadcast_q.pop()) {
if (broadcast_q.front().block_holder->get_block()) {
COUNTER_UPDATE(broadcast_q.front().channel->_parent->memory_used_counter(),
@@ -461,7 +449,7 @@ void ExchangeSinkBuffer::_set_receiver_eof(InstanceLoId id)
{
swap(empty, broadcast_q);
}
- std::queue<TransmitInfo, std::list<TransmitInfo>>& q =
_instance_to_package_queue[id];
+ std::queue<TransmitInfo, std::list<TransmitInfo>>& q = ins.package_queue;
for (; !q.empty(); q.pop()) {
// Must update _total_queue_size here, otherwise if _total_queue_size
> _queue_capacity at EOF,
// ExchangeSinkQueueDependency will be blocked and pipeline will be
deadlocked
@@ -474,7 +462,7 @@ void ExchangeSinkBuffer::_set_receiver_eof(InstanceLoId id)
{
// Try to wake up pipeline after clearing the queue
if (_total_queue_size <= _queue_capacity) {
- for (auto& [_, dep] : _queue_deps) {
+ for (auto& dep : _queue_deps) {
dep->set_ready();
}
}
@@ -486,20 +474,20 @@ void ExchangeSinkBuffer::_set_receiver_eof(InstanceLoId
id) {
}
// The unused parameter `with_lock` is to ensure that the function is called
when the lock is held.
-void ExchangeSinkBuffer::_turn_off_channel(InstanceLoId id,
+void ExchangeSinkBuffer::_turn_off_channel(RpcInstance& ins,
std::unique_lock<std::mutex>&
/*with_lock*/) {
- if (!_rpc_channel_is_idle[id]) {
- _rpc_channel_is_idle[id] = true;
+ if (!ins.rpc_channel_is_idle) {
+ ins.rpc_channel_is_idle = true;
}
// Ensure that each RPC is turned off only once.
- if (_rpc_channel_is_turn_off[id]) {
+ if (ins.rpc_channel_is_turn_off) {
return;
}
- _rpc_channel_is_turn_off[id] = true;
+ ins.rpc_channel_is_turn_off = true;
auto weak_task_ctx = weak_task_exec_ctx();
if (auto pip_ctx = weak_task_ctx.lock()) {
- for (auto& [_, parent] : _parents) {
- parent->on_channel_finished(id);
+ for (auto& parent : _parents) {
+ parent->on_channel_finished(ins.id);
}
}
}
@@ -507,10 +495,10 @@ void ExchangeSinkBuffer::_turn_off_channel(InstanceLoId
id,
void ExchangeSinkBuffer::get_max_min_rpc_time(int64_t* max_time, int64_t*
min_time) {
int64_t local_max_time = 0;
int64_t local_min_time = INT64_MAX;
- for (auto& [id, stats] : _instance_to_rpc_stats) {
- if (stats->sum_time != 0) {
- local_max_time = std::max(local_max_time, stats->sum_time);
- local_min_time = std::min(local_min_time, stats->sum_time);
+ for (auto& [_, ins] : _rpc_instances) {
+ if (ins->stats.sum_time != 0) {
+ local_max_time = std::max(local_max_time, ins->stats.sum_time);
+ local_min_time = std::min(local_min_time, ins->stats.sum_time);
}
}
*max_time = local_max_time;
@@ -519,24 +507,22 @@ void ExchangeSinkBuffer::get_max_min_rpc_time(int64_t*
max_time, int64_t* min_ti
int64_t ExchangeSinkBuffer::get_sum_rpc_time() {
int64_t sum_time = 0;
- for (auto& [id, stats] : _instance_to_rpc_stats) {
- sum_time += stats->sum_time;
+ for (auto& [_, ins] : _rpc_instances) {
+ sum_time += ins->stats.sum_time;
}
return sum_time;
}
-void ExchangeSinkBuffer::update_rpc_time(InstanceLoId id, int64_t
start_rpc_time,
+void ExchangeSinkBuffer::update_rpc_time(RpcInstance& ins, int64_t
start_rpc_time,
int64_t receive_rpc_time) {
_rpc_count++;
int64_t rpc_spend_time = receive_rpc_time - start_rpc_time;
- DCHECK(_instance_to_rpc_stats.find(id) != _instance_to_rpc_stats.end());
if (rpc_spend_time > 0) {
- ++_instance_to_rpc_stats[id]->rpc_count;
- _instance_to_rpc_stats[id]->sum_time += rpc_spend_time;
- _instance_to_rpc_stats[id]->max_time =
- std::max(_instance_to_rpc_stats[id]->max_time, rpc_spend_time);
- _instance_to_rpc_stats[id]->min_time =
- std::min(_instance_to_rpc_stats[id]->min_time, rpc_spend_time);
+ auto& stats = ins.stats;
+ ++stats.rpc_count;
+ stats.sum_time += rpc_spend_time;
+ stats.max_time = std::max(stats.max_time, rpc_spend_time);
+ stats.min_time = std::min(stats.min_time, rpc_spend_time);
}
}
@@ -561,21 +547,21 @@ void ExchangeSinkBuffer::update_profile(RuntimeProfile*
profile) {
// This counter will lead to performance degradation.
// So only collect this information when the profile level is greater than
3.
if (_state->profile_level() > 3 && max_count > 0) {
- std::vector<RpcInstanceStatistics> tmp_rpc_stats_vec;
- for (const auto& stats : _instance_to_rpc_stats_vec) {
- tmp_rpc_stats_vec.emplace_back(*stats);
+ std::vector<std::pair<InstanceLoId, RpcInstanceStatistics>>
tmp_rpc_stats_vec;
+ for (const auto& [id, ins] : _rpc_instances) {
+ tmp_rpc_stats_vec.emplace_back(id, ins->stats);
}
pdqsort(tmp_rpc_stats_vec.begin(), tmp_rpc_stats_vec.end(),
- [](const auto& a, const auto& b) { return a.max_time >
b.max_time; });
+ [](const auto& a, const auto& b) { return a.second.max_time >
b.second.max_time; });
auto count = std::min((size_t)max_count, tmp_rpc_stats_vec.size());
int i = 0;
auto* detail_profile = profile->create_child("RpcInstanceDetails",
true, true);
- for (const auto& stats : tmp_rpc_stats_vec) {
+ for (const auto& [id, stats] : tmp_rpc_stats_vec) {
if (0 == stats.rpc_count) {
continue;
}
std::stringstream out;
- out << "Instance " << std::hex << stats.inst_lo_id;
+ out << "Instance " << std::hex << id;
auto stats_str = fmt::format(
"Count: {}, MaxTime: {}, MinTime: {}, AvgTime: {},
SumTime: {}",
stats.rpc_count, PrettyPrinter::print(stats.max_time,
TUnit::TIME_NS),
@@ -594,10 +580,10 @@ void ExchangeSinkBuffer::update_profile(RuntimeProfile*
profile) {
std::string ExchangeSinkBuffer::debug_each_instance_queue_size() {
fmt::memory_buffer debug_string_buffer;
- for (auto& [id, m] : _instance_to_package_queue_mutex) {
- std::unique_lock<std::mutex> lock(*m);
+ for (auto& [id, instance_data] : _rpc_instances) {
+ std::unique_lock<std::mutex> lock(*instance_data->mutex);
fmt::format_to(debug_string_buffer, "Instance: {}, queue size: {}\n",
id,
- _instance_to_package_queue[id].size());
+ instance_data->package_queue.size());
}
return fmt::to_string(debug_string_buffer);
}
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h
b/be/src/pipeline/exec/exchange_sink_buffer.h
index 90be213c9d6..9a00ef072d5 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.h
+++ b/be/src/pipeline/exec/exchange_sink_buffer.h
@@ -114,7 +114,6 @@ struct TransmitInfo {
vectorized::Channel* channel = nullptr;
std::unique_ptr<PBlock> block;
bool eos;
- Status exec_status;
};
struct BroadcastTransmitInfo {
@@ -123,6 +122,49 @@ struct BroadcastTransmitInfo {
bool eos;
};
+struct RpcInstanceStatistics {
+ int64_t rpc_count = 0;
+ int64_t max_time = 0;
+ int64_t min_time = INT64_MAX;
+ int64_t sum_time = 0;
+};
+
+// Consolidated structure for RPC instance data
+struct RpcInstance {
+ // Constructor initializes the instance with the given ID
+ RpcInstance(InstanceLoId id) : id(id) {}
+
+ // Unique identifier for this RPC instance
+ InstanceLoId id;
+
+ // Mutex for thread-safe access to this instance's data
+ std::unique_ptr<std::mutex> mutex;
+
+ // Sequence number for RPC packets, incremented for each packet sent
+ int64_t seq = 0;
+
+ // Queue for regular data transmission requests
+ std::queue<TransmitInfo, std::list<TransmitInfo>> package_queue;
+
+ // Queue for broadcast data transmission requests
+ std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>>
broadcast_package_queue;
+
+ // RPC request parameters for data transmission
+ std::shared_ptr<PTransmitDataParams> request;
+
+ // Flag indicating if the RPC channel is currently idle (no active RPC)
+ bool rpc_channel_is_idle = true;
+
+ // Flag indicating if the RPC channel has been turned off (no more RPCs
will be sent)
+ bool rpc_channel_is_turn_off = false;
+
+ // Statistics for monitoring RPC performance (latency, counts, etc.)
+ RpcInstanceStatistics stats;
+
+ // Count of active exchange sinks using this RPC instance
+ int64_t running_sink_count = 0;
+};
+
template <typename Response>
class ExchangeSendCallback : public ::doris::DummyBrpcCallback<Response> {
ENABLE_FACTORY_CREATOR(ExchangeSendCallback);
@@ -130,20 +172,19 @@ class ExchangeSendCallback : public
::doris::DummyBrpcCallback<Response> {
public:
ExchangeSendCallback() = default;
- void init(InstanceLoId id, bool eos) {
- _id = id;
+ void init(pipeline::RpcInstance* ins, bool eos) {
+ _ins = ins;
_eos = eos;
}
~ExchangeSendCallback() override = default;
ExchangeSendCallback(const ExchangeSendCallback& other) = delete;
ExchangeSendCallback& operator=(const ExchangeSendCallback& other) =
delete;
- void addFailedHandler(
- const std::function<void(const InstanceLoId&, const
std::string&)>& fail_fn) {
+ void addFailedHandler(const std::function<void(RpcInstance*, const
std::string&)>& fail_fn) {
_fail_fn = fail_fn;
}
- void addSuccessHandler(const std::function<void(const InstanceLoId&, const
bool&,
- const Response&, const
int64_t&)>& suc_fn) {
+ void addSuccessHandler(const std::function<void(RpcInstance*, const bool&,
const Response&,
+ const int64_t&)>& suc_fn) {
_suc_fn = suc_fn;
}
@@ -157,9 +198,9 @@ public:
::doris::DummyBrpcCallback<Response>::cntl_->ErrorText(),
BackendOptions::get_localhost(),
::doris::DummyBrpcCallback<Response>::cntl_->latency_us());
- _fail_fn(_id, err);
+ _fail_fn(_ins, err);
} else {
- _suc_fn(_id, _eos,
*(::doris::DummyBrpcCallback<Response>::response_),
+ _suc_fn(_ins, _eos,
*(::doris::DummyBrpcCallback<Response>::response_),
start_rpc_time);
}
} catch (const std::exception& exp) {
@@ -172,9 +213,9 @@ public:
int64_t start_rpc_time;
private:
- std::function<void(const InstanceLoId&, const std::string&)> _fail_fn;
- std::function<void(const InstanceLoId&, const bool&, const Response&,
const int64_t&)> _suc_fn;
- InstanceLoId _id;
+ std::function<void(RpcInstance*, const std::string&)> _fail_fn;
+ std::function<void(RpcInstance*, const bool&, const Response&, const
int64_t&)> _suc_fn;
+ RpcInstance* _ins;
bool _eos;
};
@@ -237,15 +278,14 @@ public:
Status add_block(TransmitInfo&& request);
Status add_block(BroadcastTransmitInfo&& request);
void close();
- void update_rpc_time(InstanceLoId id, int64_t start_rpc_time, int64_t
receive_rpc_time);
+ void update_rpc_time(RpcInstance& ins, int64_t start_rpc_time, int64_t
receive_rpc_time);
void update_profile(RuntimeProfile* profile);
void set_dependency(InstanceLoId sender_ins_id,
std::shared_ptr<Dependency> queue_dependency,
ExchangeSinkLocalState* local_state) {
- DCHECK(_queue_deps.contains(sender_ins_id));
- DCHECK(_parents.contains(sender_ins_id));
- _queue_deps[sender_ins_id] = queue_dependency;
- _parents[sender_ins_id] = local_state;
+ std::lock_guard l(_m);
+ _queue_deps.push_back(queue_dependency);
+ _parents.push_back(local_state);
}
void set_low_memory_mode() { _queue_capacity = 8; }
@@ -257,38 +297,9 @@ private:
#endif
friend class ExchangeSinkLocalState;
- phmap::flat_hash_map<InstanceLoId, std::unique_ptr<std::mutex>>
- _instance_to_package_queue_mutex;
- // store data in non-broadcast shuffle
- phmap::flat_hash_map<InstanceLoId, std::queue<TransmitInfo,
std::list<TransmitInfo>>>
- _instance_to_package_queue;
+ // Single map to store all RPC instance data
+ phmap::flat_hash_map<InstanceLoId, std::unique_ptr<RpcInstance>>
_rpc_instances;
std::atomic<size_t> _queue_capacity;
- // store data in broadcast shuffle
- phmap::flat_hash_map<InstanceLoId,
- std::queue<BroadcastTransmitInfo,
std::list<BroadcastTransmitInfo>>>
- _instance_to_broadcast_package_queue;
- using PackageSeq = int64_t;
- // must init zero
- // TODO: make all flat_hash_map to a STRUT
- phmap::flat_hash_map<InstanceLoId, PackageSeq> _instance_to_seq;
- phmap::flat_hash_map<InstanceLoId, std::shared_ptr<PTransmitDataParams>>
_instance_to_request;
- // One channel is corresponding to a downstream instance.
- phmap::flat_hash_map<InstanceLoId, bool> _rpc_channel_is_idle;
-
- // There could be multiple situations that cause an rpc_channel to be
turned off,
- // such as receiving the eof, manual cancellation by the user, or all
sinks reaching eos.
- // Therefore, it is necessary to prevent an rpc_channel from being turned
off multiple times.
- phmap::flat_hash_map<InstanceLoId, bool> _rpc_channel_is_turn_off;
- struct RpcInstanceStatistics {
- RpcInstanceStatistics(InstanceLoId id) : inst_lo_id(id) {}
- InstanceLoId inst_lo_id;
- int64_t rpc_count = 0;
- int64_t max_time = 0;
- int64_t min_time = INT64_MAX;
- int64_t sum_time = 0;
- };
- std::vector<std::shared_ptr<RpcInstanceStatistics>>
_instance_to_rpc_stats_vec;
- phmap::flat_hash_map<InstanceLoId, RpcInstanceStatistics*>
_instance_to_rpc_stats;
// It is set to true only when an RPC fails. Currently, we do not have an
error retry mechanism.
// If an RPC error occurs, the query will be canceled.
@@ -301,19 +312,19 @@ private:
RuntimeState* _state = nullptr;
QueryContext* _context = nullptr;
- Status _send_rpc(InstanceLoId);
+ Status _send_rpc(RpcInstance& ins);
#ifndef BE_TEST
- inline void _ended(InstanceLoId id);
+ inline void _ended(RpcInstance& ins);
inline void _failed(InstanceLoId id, const std::string& err);
- inline void _set_receiver_eof(InstanceLoId id);
- inline void _turn_off_channel(InstanceLoId id,
std::unique_lock<std::mutex>& with_lock);
+ inline void _set_receiver_eof(RpcInstance& ins);
+ inline void _turn_off_channel(RpcInstance& ins,
std::unique_lock<std::mutex>& with_lock);
#else
- virtual void _ended(InstanceLoId id);
+ virtual void _ended(RpcInstance& ins);
virtual void _failed(InstanceLoId id, const std::string& err);
- virtual void _set_receiver_eof(InstanceLoId id);
- virtual void _turn_off_channel(InstanceLoId id,
std::unique_lock<std::mutex>& with_lock);
+ virtual void _set_receiver_eof(RpcInstance& ins);
+ virtual void _turn_off_channel(RpcInstance& ins,
std::unique_lock<std::mutex>& with_lock);
#endif
void get_max_min_rpc_time(int64_t* max_time, int64_t* min_time);
@@ -323,13 +334,12 @@ private:
// Any modification to instance_to_package_queue requires a corresponding
modification to _total_queue_size.
std::atomic<int> _total_queue_size = 0;
- // _running_sink_count is used to track how many sinks have not finished
yet.
- // It is only decremented when eos is reached.
- phmap::flat_hash_map<InstanceLoId, int64_t> _running_sink_count;
+ // protected the `_queue_deps` and `_parents`
+ std::mutex _m;
// _queue_deps is used for memory control.
- phmap::flat_hash_map<InstanceLoId, std::shared_ptr<Dependency>>
_queue_deps;
+ std::vector<std::shared_ptr<Dependency>> _queue_deps;
// The ExchangeSinkLocalState in _parents is only used in
_turn_off_channel.
- phmap::flat_hash_map<InstanceLoId, ExchangeSinkLocalState*> _parents;
+ std::vector<ExchangeSinkLocalState*> _parents;
const int64_t _exchange_sink_num;
};
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index b105985e11a..e944cb78f67 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -102,10 +102,10 @@ Status ExchangeSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo& inf
_last_local_channel_idx = i;
}
}
- only_local_exchange = local_size == channels.size();
+ _only_local_exchange = local_size == channels.size();
_rpc_channels_num = channels.size() - local_size;
- if (!only_local_exchange) {
+ if (!_only_local_exchange) {
_sink_buffer = p.get_sink_buffer(state->fragment_instance_id().lo);
register_channels(_sink_buffer.get());
_queue_dependency = Dependency::create_shared(_parent->operator_id(),
_parent->node_id(),
@@ -220,12 +220,9 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
id.set_hi(_state->query_id().hi);
id.set_lo(_state->query_id().lo);
- if ((_part_type == TPartitionType::UNPARTITIONED || channels.size() == 1)
&&
- !only_local_exchange) {
- _broadcast_dependency = Dependency::create_shared(
- _parent->operator_id(), _parent->node_id(),
"BroadcastDependency", true);
+ if ((_part_type == TPartitionType::UNPARTITIONED) &&
!_only_local_exchange) {
_broadcast_pb_mem_limiter =
-
vectorized::BroadcastPBlockHolderMemLimiter::create_shared(_broadcast_dependency);
+
vectorized::BroadcastPBlockHolderMemLimiter::create_shared(_queue_dependency);
} else if (_last_local_channel_idx > -1) {
size_t dep_id = 0;
for (auto& channel : channels) {
@@ -298,6 +295,20 @@ ExchangeSinkOperatorX::ExchangeSinkOperatorX(
if (sink.__isset.output_tuple_id) {
_output_tuple_id = sink.output_tuple_id;
}
+
+ // Bucket shuffle may contain some same bucket so no need change the
BUCKET_SHFFULE_HASH_PARTITIONED
+ if (_part_type != TPartitionType::UNPARTITIONED &&
+ _part_type != TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) {
+ // if the destinations only one dest, we need to use broadcast
+ std::unordered_set<UniqueId> dest_fragment_ids_set;
+ for (auto& dest : _dests) {
+ dest_fragment_ids_set.insert(dest.fragment_instance_id);
+ if (dest_fragment_ids_set.size() > 1) {
+ break;
+ }
+ }
+ _part_type = dest_fragment_ids_set.size() == 1 ?
TPartitionType::UNPARTITIONED : _part_type;
+ }
}
Status ExchangeSinkOperatorX::init(const TDataSink& tsink) {
@@ -368,11 +379,11 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state,
vectorized::Block* block
set_low_memory_mode(state);
}
- if (_part_type == TPartitionType::UNPARTITIONED ||
local_state.channels.size() == 1) {
+ if (_part_type == TPartitionType::UNPARTITIONED) {
// 1. serialize depends on it is not local exchange
// 2. send block
// 3. rollover block
- if (local_state.only_local_exchange) {
+ if (local_state._only_local_exchange) {
if (!block->empty()) {
Status status;
size_t idx = 0;
@@ -549,9 +560,6 @@ Status ExchangeSinkLocalState::close(RuntimeState* state,
Status exec_status) {
}
COUNTER_SET(_wait_for_finish_dependency_timer,
_finish_dependency->watcher_elapse_time());
- if (_broadcast_dependency) {
- COUNTER_UPDATE(_wait_broadcast_buffer_timer,
_broadcast_dependency->watcher_elapse_time());
- }
for (size_t i = 0; i < _local_channels_dependency.size(); i++) {
COUNTER_UPDATE(_wait_channel_timer[i],
_local_channels_dependency[i]->watcher_elapse_time());
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h
b/be/src/pipeline/exec/exchange_sink_operator.h
index f9cf4f90748..cdef5e5e119 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -62,9 +62,6 @@ public:
if (_queue_dependency) {
dep_vec.push_back(_queue_dependency.get());
}
- if (_broadcast_dependency) {
- dep_vec.push_back(_broadcast_dependency.get());
- }
std::for_each(_local_channels_dependency.begin(),
_local_channels_dependency.end(),
[&](std::shared_ptr<Dependency> dep) {
dep_vec.push_back(dep.get()); });
return dep_vec;
@@ -104,7 +101,7 @@ public:
}
std::vector<std::shared_ptr<vectorized::Channel>> channels;
int current_channel_idx {0}; // index of current channel to send to if
_random == true
- bool only_local_exchange {false};
+ bool _only_local_exchange {false};
void on_channel_finished(InstanceLoId channel_id);
vectorized::PartitionerBase* partitioner() const { return
_partitioner.get(); }
@@ -145,7 +142,6 @@ private:
vectorized::BlockSerializer _serializer;
std::shared_ptr<Dependency> _queue_dependency = nullptr;
- std::shared_ptr<Dependency> _broadcast_dependency = nullptr;
/**
* We use this to control the execution for local exchange.
@@ -202,7 +198,7 @@ public:
bool is_serial_operator() const override { return true; }
void set_low_memory_mode(RuntimeState* state) override {
auto& local_state = get_local_state(state);
- // When `local_state.only_local_exchange` the `sink_buffer` is nullptr.
+ // When `local_state._only_local_exchange` the `sink_buffer` is
nullptr.
if (local_state._sink_buffer) {
local_state._sink_buffer->set_low_memory_mode();
}
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp
b/be/src/vec/sink/vdata_stream_sender.cpp
index b4d6331c6f6..3f09d02f20f 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -174,7 +174,7 @@ Status Channel::send_remote_block(std::unique_ptr<PBlock>&&
block, bool eos) {
}
}
if (eos || block->column_metas_size()) {
- RETURN_IF_ERROR(_buffer->add_block({this, std::move(block), eos,
Status::OK()}));
+ RETURN_IF_ERROR(_buffer->add_block({this, std::move(block), eos}));
}
return Status::OK();
}
diff --git a/be/src/vec/sink/vdata_stream_sender.h
b/be/src/vec/sink/vdata_stream_sender.h
index c7b96c8638c..f62391d66b1 100644
--- a/be/src/vec/sink/vdata_stream_sender.h
+++ b/be/src/vec/sink/vdata_stream_sender.h
@@ -166,13 +166,13 @@ public:
InstanceLoId dest_ins_id() const { return _fragment_instance_id.lo; }
std::shared_ptr<pipeline::ExchangeSendCallback<PTransmitDataResult>>
get_send_callback(
- InstanceLoId id, bool eos) {
+ pipeline::RpcInstance* ins, bool eos) {
if (!_send_callback) {
_send_callback =
pipeline::ExchangeSendCallback<PTransmitDataResult>::create_shared();
} else {
_send_callback->cntl_->Reset();
}
- _send_callback->init(id, eos);
+ _send_callback->init(ins, eos);
return _send_callback;
}
diff --git a/be/test/pipeline/pipeline_test.cpp
b/be/test/pipeline/pipeline_test.cpp
index 6c2c99931a4..305e895b7b3 100644
--- a/be/test/pipeline/pipeline_test.cpp
+++ b/be/test/pipeline/pipeline_test.cpp
@@ -321,7 +321,7 @@ TEST_F(PipelineTest, HAPPY_PATH) {
auto& sink_local_state =
_runtime_states.back().front()->get_sink_local_state()->cast<ExchangeSinkLocalState>();
EXPECT_EQ(sink_local_state.channels.size(), 1);
- EXPECT_EQ(sink_local_state.only_local_exchange, true);
+ EXPECT_EQ(sink_local_state._only_local_exchange, true);
EXPECT_EQ(local_state.stream_recvr->sender_queues().size(), 1);
diff --git a/be/test/vec/exec/exchange_sink_test.cpp
b/be/test/vec/exec/exchange_sink_test.cpp
index 7dbd352bd3a..0643d3c67b8 100644
--- a/be/test/vec/exec/exchange_sink_test.cpp
+++ b/be/test/vec/exec/exchange_sink_test.cpp
@@ -47,12 +47,12 @@ TEST_F(ExchangeSInkTest, test_normal_end) {
EXPECT_EQ(sink3.add_block(dest_ins_id_2, true), Status::OK());
EXPECT_EQ(sink3.add_block(dest_ins_id_3, true), Status::OK());
- for (auto [id, count] : buffer->_running_sink_count) {
- EXPECT_EQ(count, 3) << "id : " << id;
+ for (const auto& [id, instance] : buffer->_rpc_instances) {
+ EXPECT_EQ(instance->running_sink_count, 3) << "id : " << id;
}
- for (auto [id, is_turn_off] : buffer->_rpc_channel_is_turn_off) {
- EXPECT_EQ(is_turn_off, false) << "id : " << id;
+ for (const auto& [id, instance] : buffer->_rpc_instances) {
+ EXPECT_EQ(instance->rpc_channel_is_turn_off, false) << "id : " <<
id;
}
pop_block(dest_ins_id_1, PopState::accept);
@@ -67,12 +67,12 @@ TEST_F(ExchangeSInkTest, test_normal_end) {
pop_block(dest_ins_id_3, PopState::accept);
pop_block(dest_ins_id_3, PopState::accept);
- for (auto [id, count] : buffer->_running_sink_count) {
- EXPECT_EQ(count, 0) << "id : " << id;
+ for (const auto& [id, instance] : buffer->_rpc_instances) {
+ EXPECT_EQ(instance->running_sink_count, 0) << "id : " << id;
}
- for (auto [id, is_turn_off] : buffer->_rpc_channel_is_turn_off) {
- EXPECT_EQ(is_turn_off, true) << "id : " << id;
+ for (const auto& [id, instance] : buffer->_rpc_instances) {
+ EXPECT_EQ(instance->rpc_channel_is_turn_off, true) << "id : " <<
id;
}
clear_all_done();
}
@@ -99,17 +99,17 @@ TEST_F(ExchangeSInkTest, test_eof_end) {
EXPECT_EQ(sink3.add_block(dest_ins_id_2, true), Status::OK());
EXPECT_EQ(sink3.add_block(dest_ins_id_3, false), Status::OK());
- for (auto [id, count] : buffer->_running_sink_count) {
- EXPECT_EQ(count, 3) << "id : " << id;
+ for (const auto& [id, instance] : buffer->_rpc_instances) {
+ EXPECT_EQ(instance->running_sink_count, 3) << "id : " << id;
}
- for (auto [id, is_turn_off] : buffer->_rpc_channel_is_turn_off) {
- EXPECT_EQ(is_turn_off, false) << "id : " << id;
+ for (const auto& [id, instance] : buffer->_rpc_instances) {
+ EXPECT_EQ(instance->rpc_channel_is_turn_off, false) << "id : " <<
id;
}
pop_block(dest_ins_id_1, PopState::eof);
- EXPECT_EQ(buffer->_rpc_channel_is_turn_off[dest_ins_id_1], true);
- EXPECT_TRUE(buffer->_instance_to_package_queue[dest_ins_id_1].empty());
+
EXPECT_EQ(buffer->_rpc_instances[dest_ins_id_1]->rpc_channel_is_turn_off, true);
+
EXPECT_TRUE(buffer->_rpc_instances[dest_ins_id_1]->package_queue.empty());
pop_block(dest_ins_id_2, PopState::accept);
pop_block(dest_ins_id_2, PopState::accept);
@@ -119,9 +119,11 @@ TEST_F(ExchangeSInkTest, test_eof_end) {
pop_block(dest_ins_id_3, PopState::accept);
pop_block(dest_ins_id_3, PopState::accept);
- EXPECT_EQ(buffer->_rpc_channel_is_turn_off[dest_ins_id_1], true);
- EXPECT_EQ(buffer->_rpc_channel_is_turn_off[dest_ins_id_2], false) <<
"not all eos";
- EXPECT_EQ(buffer->_rpc_channel_is_turn_off[dest_ins_id_3], false) << "
not all eos";
+
EXPECT_EQ(buffer->_rpc_instances[dest_ins_id_1]->rpc_channel_is_turn_off, true);
+
EXPECT_EQ(buffer->_rpc_instances[dest_ins_id_2]->rpc_channel_is_turn_off, false)
+ << "not all eos";
+
EXPECT_EQ(buffer->_rpc_instances[dest_ins_id_3]->rpc_channel_is_turn_off, false)
+ << " not all eos";
EXPECT_TRUE(sink1.add_block(dest_ins_id_1,
true).is<ErrorCode::END_OF_FILE>());
EXPECT_EQ(sink1.add_block(dest_ins_id_2, true), Status::OK());
@@ -129,10 +131,10 @@ TEST_F(ExchangeSInkTest, test_eof_end) {
pop_block(dest_ins_id_2, PopState::accept);
pop_block(dest_ins_id_3, PopState::accept);
- EXPECT_EQ(buffer->_rpc_channel_is_turn_off[dest_ins_id_1], true);
- EXPECT_EQ(buffer->_rpc_channel_is_turn_off[dest_ins_id_2], true);
- EXPECT_EQ(buffer->_rpc_channel_is_turn_off[dest_ins_id_3], false);
- EXPECT_EQ(buffer->_running_sink_count[dest_ins_id_3], 1);
+
EXPECT_EQ(buffer->_rpc_instances[dest_ins_id_1]->rpc_channel_is_turn_off, true);
+
EXPECT_EQ(buffer->_rpc_instances[dest_ins_id_2]->rpc_channel_is_turn_off, true);
+
EXPECT_EQ(buffer->_rpc_instances[dest_ins_id_3]->rpc_channel_is_turn_off,
false);
+ EXPECT_EQ(buffer->_rpc_instances[dest_ins_id_3]->running_sink_count,
1);
clear_all_done();
}
@@ -159,12 +161,12 @@ TEST_F(ExchangeSInkTest, test_error_end) {
EXPECT_EQ(sink3.add_block(dest_ins_id_2, false), Status::OK());
EXPECT_EQ(sink3.add_block(dest_ins_id_3, false), Status::OK());
- for (auto [id, count] : buffer->_running_sink_count) {
- EXPECT_EQ(count, 3) << "id : " << id;
+ for (const auto& [id, instance] : buffer->_rpc_instances) {
+ EXPECT_EQ(instance->running_sink_count, 3) << "id : " << id;
}
- for (auto [id, is_turn_off] : buffer->_rpc_channel_is_turn_off) {
- EXPECT_EQ(is_turn_off, false) << "id : " << id;
+ for (const auto& [id, instance] : buffer->_rpc_instances) {
+ EXPECT_EQ(instance->rpc_channel_is_turn_off, false) << "id : " <<
id;
}
pop_block(dest_ins_id_2, PopState::error);
@@ -226,9 +228,9 @@ TEST_F(ExchangeSInkTest, test_queue_size) {
std::cout << "each queue size : \n" <<
buffer->debug_each_instance_queue_size() << "\n";
- EXPECT_EQ(buffer->_rpc_channel_is_turn_off[dest_ins_id_1], false);
- EXPECT_EQ(buffer->_rpc_channel_is_turn_off[dest_ins_id_2], true);
- EXPECT_EQ(buffer->_rpc_channel_is_turn_off[dest_ins_id_3], false);
+
EXPECT_EQ(buffer->_rpc_instances[dest_ins_id_1]->rpc_channel_is_turn_off,
false);
+
EXPECT_EQ(buffer->_rpc_instances[dest_ins_id_2]->rpc_channel_is_turn_off, true);
+
EXPECT_EQ(buffer->_rpc_instances[dest_ins_id_3]->rpc_channel_is_turn_off,
false);
clear_all_done();
}
}
diff --git a/be/test/vec/exec/exchange_sink_test.h
b/be/test/vec/exec/exchange_sink_test.h
index 253d7b267f9..59004a53a9e 100644
--- a/be/test/vec/exec/exchange_sink_test.h
+++ b/be/test/vec/exec/exchange_sink_test.h
@@ -138,10 +138,8 @@ struct SinkWithChannel {
std::map<int64_t, std::shared_ptr<Channel>> channels;
Status add_block(int64_t id, bool eos) {
auto channel = channels[id];
- TransmitInfo transmitInfo {.channel = channel.get(),
- .block = std::make_unique<PBlock>(),
- .eos = eos,
- .exec_status = Status::OK()};
+ TransmitInfo transmitInfo {
+ .channel = channel.get(), .block = std::make_unique<PBlock>(),
.eos = eos};
return buffer->add_block(std::move(transmitInfo));
}
};
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]