yiguolei commented on code in PR #45375:
URL: https://github.com/apache/doris/pull/45375#discussion_r1946335904
##########
be/src/pipeline/local_exchange/local_exchanger.cpp:
##########
@@ -27,129 +28,193 @@
namespace doris::pipeline {
#include "common/compile_check_begin.h"
-template <typename BlockType>
-void Exchanger<BlockType>::_enqueue_data_and_set_ready(int channel_id,
-
LocalExchangeSinkLocalState* local_state,
- BlockType&& block) {
+
+template <typename QueueType>
+void Exchanger<QueueType>::_enqueue_data_and_set_ready(
+ RuntimeProfile::Counter* enqueue_rows_counter, int channel_id,
+ LocalExchangeSinkLocalState* local_state, typename
QueueType::BlockType&& block) {
if (local_state == nullptr) {
- _enqueue_data_and_set_ready(channel_id, std::move(block));
+ _enqueue_data_and_set_ready(enqueue_rows_counter, channel_id,
std::move(block));
return;
}
size_t allocated_bytes = 0;
+ int64_t rows = 0;
// PartitionedBlock is used by shuffle exchanger.
// PartitionedBlock will be push into multiple queues with different row
ranges, so it will be
// referenced multiple times. Otherwise, we only ref the block once
because it is only push into
// one queue.
- if constexpr (std::is_same_v<PartitionedBlock, BlockType> ||
- std::is_same_v<BroadcastBlock, BlockType>) {
+ if constexpr (std::is_same_v<PartitionedBlock, typename
QueueType::BlockType> ||
+ std::is_same_v<BroadcastBlock, typename
QueueType::BlockType>) {
allocated_bytes = block.first->data_block.allocated_bytes();
+ rows = block.second.length;
} else {
block->ref(1);
allocated_bytes = block->data_block.allocated_bytes();
+ rows = block->data_block.rows();
}
std::unique_lock l(*_m[channel_id]);
- local_state->_shared_state->add_mem_usage(channel_id, allocated_bytes,
-
!std::is_same_v<PartitionedBlock, BlockType> &&
-
!std::is_same_v<BroadcastBlock, BlockType>);
- if (_data_queue[channel_id].enqueue(std::move(block))) {
+ local_state->_shared_state->add_mem_usage(
+ channel_id, allocated_bytes,
+ !std::is_same_v<PartitionedBlock, typename QueueType::BlockType> &&
+ !std::is_same_v<BroadcastBlock, typename
QueueType::BlockType>);
+ if (_data_queue[channel_id]->enqueue(std::move(block))) {
+ COUNTER_UPDATE(enqueue_rows_counter, rows);
local_state->_shared_state->set_ready_to_read(channel_id);
} else {
local_state->_shared_state->sub_mem_usage(channel_id, allocated_bytes);
// `enqueue(block)` return false iff this queue's source operator is
already closed so we
// just unref the block.
- if constexpr (std::is_same_v<PartitionedBlock, BlockType> ||
- std::is_same_v<BroadcastBlock, BlockType>) {
- block.first->unref(local_state->_shared_state, allocated_bytes,
channel_id);
+ if constexpr (std::is_same_v<PartitionedBlock, typename
QueueType::BlockType> ||
+ std::is_same_v<BroadcastBlock, typename
QueueType::BlockType>) {
+ block.first->unref(this, local_state->_shared_state,
allocated_bytes, channel_id);
} else {
- block->unref(local_state->_shared_state, allocated_bytes,
channel_id);
+ block->unref(this, local_state->_shared_state, allocated_bytes,
channel_id);
DCHECK_EQ(block->ref_value(), 0);
}
}
}
-template <typename BlockType>
-bool Exchanger<BlockType>::_dequeue_data(LocalExchangeSourceLocalState*
local_state,
- BlockType& block, bool* eos,
vectorized::Block* data_block,
- int channel_id) {
+template <typename QueueType>
+bool Exchanger<QueueType>::_dequeue_data(RuntimeProfile::Counter*
dequeue_rows_counter,
+ RuntimeProfile::Counter*
get_block_failed_counter,
+ LocalExchangeSourceLocalState*
local_state,
+ typename QueueType::BlockType& block,
bool* eos,
+ vectorized::Block* data_block, int
channel_id) {
if (local_state == nullptr) {
- if (!_dequeue_data(block, eos, data_block, channel_id)) {
- throw Exception(ErrorCode::INTERNAL_ERROR, "Exchanger has no data:
{}",
- data_queue_debug_string(channel_id));
+ return _dequeue_data(dequeue_rows_counter, get_block_failed_counter,
block, eos, data_block,
+ channel_id);
+ }
+ int64_t rows = 0;
+ Defer defer {[&]() {
+ if (rows > 0) {
+ if (dequeue_rows_counter) {
+ COUNTER_UPDATE(dequeue_rows_counter, rows);
+ }
+ } else {
+ if (get_block_failed_counter) {
+ COUNTER_UPDATE(get_block_failed_counter, 1);
+ }
}
- return true;
- }
+ }};
bool all_finished = _running_sink_operators == 0;
- if (_data_queue[channel_id].try_dequeue(block)) {
- if constexpr (std::is_same_v<PartitionedBlock, BlockType> ||
- std::is_same_v<BroadcastBlock, BlockType>) {
+ if (_data_queue[channel_id]->try_dequeue(block)) {
+ if constexpr (std::is_same_v<PartitionedBlock, typename
QueueType::BlockType> ||
+ std::is_same_v<BroadcastBlock, typename
QueueType::BlockType>) {
+ rows = block.second.length;
+ } else {
+ rows = block->data_block.rows();
+ }
+
+ if constexpr (std::is_same_v<PartitionedBlock, typename
QueueType::BlockType> ||
+ std::is_same_v<BroadcastBlock, typename
QueueType::BlockType>) {
local_state->_shared_state->sub_mem_usage(channel_id,
block.first->data_block.allocated_bytes());
} else {
local_state->_shared_state->sub_mem_usage(channel_id,
block->data_block.allocated_bytes());
data_block->swap(block->data_block);
- block->unref(local_state->_shared_state,
data_block->allocated_bytes(), channel_id);
+ block->unref(this, local_state->_shared_state,
data_block->allocated_bytes(),
+ channel_id);
DCHECK_EQ(block->ref_value(), 0);
}
return true;
} else if (all_finished) {
*eos = true;
} else {
std::unique_lock l(*_m[channel_id]);
- if (_data_queue[channel_id].try_dequeue(block)) {
- if constexpr (std::is_same_v<PartitionedBlock, BlockType> ||
- std::is_same_v<BroadcastBlock, BlockType>) {
+ if (_data_queue[channel_id]->try_dequeue(block)) {
Review Comment:
对于local shuffle来说,我们这个queue,为啥不能统一成exchange sink 里那个channel的概念?
相当于我们的shuffle的逻辑是共用的,都是放到channel里,区别只是channel 是基于内存的,read write,还是基于网络的read
write
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]