This is an automated email from the ASF dual-hosted git repository.
panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 24eff502145 [Chore](exchange) remove some unused code of Channel
(#42095)
24eff502145 is described below
commit 24eff50214594d886e9a6a7c7f5bfeaf9665546c
Author: Pxl <[email protected]>
AuthorDate: Mon Oct 21 14:03:10 2024 +0800
[Chore](exchange) remove some unused code of Channel (#42095)
## Proposed changes
remove some unused code of Channel
---
be/src/vec/sink/vdata_stream_sender.cpp | 86 +--------------------------------
be/src/vec/sink/vdata_stream_sender.h | 41 +++++-----------
2 files changed, 14 insertions(+), 113 deletions(-)
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp
b/be/src/vec/sink/vdata_stream_sender.cpp
index 496e68c97f0..b139c503c9a 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -174,21 +174,6 @@ Status PipChannel::send_current_block(bool eos, Status
exec_status) {
return Status::OK();
}
-template <typename Parent>
-Status Channel<Parent>::send_current_block(bool eos, Status exec_status) {
- // FIXME: Now, local exchange will cause the performance problem is in a
multi-threaded scenario
- // so this feature is turned off here by default. We need to re-examine
this logic
- if (is_local()) {
- return send_local_block(exec_status, eos);
- }
- if (eos) {
- RETURN_IF_ERROR(_serializer.serialize_block(_ch_cur_pb_block, 1));
- }
- RETURN_IF_ERROR(send_remote_block(_ch_cur_pb_block, eos, exec_status));
- ch_roll_pb_block();
- return Status::OK();
-}
-
template <typename Parent>
Status Channel<Parent>::send_local_block(Status exec_status, bool eos) {
SCOPED_TIMER(_parent->local_send_timer());
@@ -228,71 +213,6 @@ Status Channel<Parent>::send_local_block(Block* block,
bool can_be_moved) {
}
}
-template <typename Parent>
-Status Channel<Parent>::send_remote_block(PBlock* block, bool eos, Status
exec_status) {
- if constexpr (!std::is_same_v<pipeline::ResultFileSinkLocalState, Parent>)
{
- COUNTER_UPDATE(_parent->blocks_sent_counter(), 1);
- }
- SCOPED_TIMER(_parent->brpc_send_timer());
-
- if (_send_remote_block_callback == nullptr) {
- _send_remote_block_callback =
DummyBrpcCallback<PTransmitDataResult>::create_shared();
- } else {
- RETURN_IF_ERROR(_wait_last_brpc());
- _send_remote_block_callback->cntl_->Reset();
- }
- VLOG_ROW << "Channel<Parent>::send_batch() instance_id=" <<
print_id(_fragment_instance_id)
- << " dest_node=" << _dest_node_id << " to_host=" <<
_brpc_dest_addr.hostname
- << " _packet_seq=" << _packet_seq << " row_desc=" <<
_row_desc.debug_string();
-
- _brpc_request->set_eos(eos);
- if (!exec_status.ok()) {
- exec_status.to_protobuf(_brpc_request->mutable_exec_status());
- }
- if (block != nullptr && !block->column_metas().empty()) {
- _brpc_request->set_allocated_block(block);
- }
- _brpc_request->set_packet_seq(_packet_seq++);
-
- _send_remote_block_callback->cntl_->set_timeout_ms(_brpc_timeout_ms);
- if (config::exchange_sink_ignore_eovercrowded) {
- _send_remote_block_callback->cntl_->ignore_eovercrowded();
- }
-
- {
- auto send_remote_block_closure =
- AutoReleaseClosure<PTransmitDataParams,
DummyBrpcCallback<PTransmitDataResult>>::
- create_unique(_brpc_request,
_send_remote_block_callback);
- if (enable_http_send_block(*_brpc_request)) {
- RETURN_IF_ERROR(transmit_block_httpv2(
- _state->exec_env(), std::move(send_remote_block_closure),
_brpc_dest_addr));
- } else {
- transmit_blockv2(*_brpc_stub,
std::move(send_remote_block_closure));
- }
- }
-
- if (block != nullptr) {
- static_cast<void>(_brpc_request->release_block());
- }
- return Status::OK();
-}
-
-template <typename Parent>
-Status Channel<Parent>::add_rows(Block* block, const std::vector<uint32_t>&
rows, bool eos) {
- if (_fragment_instance_id.lo == -1) {
- return Status::OK();
- }
-
- bool serialized = false;
- RETURN_IF_ERROR(
- _serializer.next_serialized_block(block, _ch_cur_pb_block, 1,
&serialized, eos, &rows));
- if (serialized) {
- RETURN_IF_ERROR(send_current_block(false, Status::OK()));
- }
-
- return Status::OK();
-}
-
template <typename Parent>
Status Channel<Parent>::close_wait(RuntimeState* state) {
if (_need_close) {
@@ -309,8 +229,7 @@ Status Channel<Parent>::close_wait(RuntimeState* state) {
return Status::OK();
}
-template <typename Parent>
-Status Channel<Parent>::close_internal(Status exec_status) {
+Status PipChannel::close_internal(Status exec_status) {
if (!_need_close) {
return Status::OK();
}
@@ -343,8 +262,7 @@ Status Channel<Parent>::close_internal(Status exec_status) {
}
}
-template <typename Parent>
-Status Channel<Parent>::close(RuntimeState* state, Status exec_status) {
+Status PipChannel::close(RuntimeState* state, Status exec_status) {
if (_closed) {
return Status::OK();
}
diff --git a/be/src/vec/sink/vdata_stream_sender.h
b/be/src/vec/sink/vdata_stream_sender.h
index 43d00b0164a..b0b0e0dc182 100644
--- a/be/src/vec/sink/vdata_stream_sender.h
+++ b/be/src/vec/sink/vdata_stream_sender.h
@@ -138,30 +138,9 @@ public:
Status init(RuntimeState* state);
Status open(RuntimeState* state);
- // Asynchronously sends a row batch.
- // Returns the status of the most recently finished transmit_data
- // rpc (or OK if there wasn't one that hasn't been reported yet).
- // if batch is nullptr, send the eof packet
- virtual Status send_remote_block(PBlock* block, bool eos = false,
- Status exec_status = Status::OK());
-
- virtual Status
send_broadcast_block(std::shared_ptr<BroadcastPBlockHolder>& block,
- bool eos = false) {
- return Status::InternalError("Send BroadcastPBlockHolder is not
allowed!");
- }
-
- virtual Status add_rows(Block* block, const std::vector<uint32_t>& row,
bool eos);
-
- virtual Status send_current_block(bool eos, Status exec_status);
-
Status send_local_block(Status exec_status, bool eos = false);
Status send_local_block(Block* block, bool can_be_moved);
- // Flush buffered rows and close channel. This function don't wait the
response
- // of close operation, client should call close_wait() to finish channel's
close.
- // We split one close operation into two phases in order to make multiple
channels
- // can run parallel.
- Status close(RuntimeState* state, Status exec_status);
// Get close wait's response, to finish channel close operation.
Status close_wait(RuntimeState* state);
@@ -214,8 +193,6 @@ protected:
return _receiver_status;
}
- Status close_internal(Status exec_status);
-
Parent* _parent = nullptr;
const RowDescriptor& _row_desc;
@@ -288,17 +265,23 @@ public:
Channel<pipeline::ExchangeSinkLocalState>::_ch_cur_pb_block = new
PBlock();
}
+ // Flush buffered rows and close channel. This function don't wait the
response
+ // of close operation, client should call close_wait() to finish channel's
close.
+ // We split one close operation into two phases in order to make multiple
channels
+ // can run parallel.
+ Status close(RuntimeState* state, Status exec_status);
+
+ Status close_internal(Status exec_status);
+
// Asynchronously sends a block
// Returns the status of the most recently finished transmit_data
// rpc (or OK if there wasn't one that hasn't been reported yet).
// if batch is nullptr, send the eof packet
- Status send_remote_block(PBlock* block, bool eos = false,
- Status exec_status = Status::OK()) override;
+ Status send_remote_block(PBlock* block, bool eos = false, Status
exec_status = Status::OK());
- Status send_broadcast_block(std::shared_ptr<BroadcastPBlockHolder>& block,
- bool eos = false) override;
+ Status send_broadcast_block(std::shared_ptr<BroadcastPBlockHolder>& block,
bool eos = false);
- Status add_rows(Block* block, const std::vector<uint32_t>& rows, bool eos)
override {
+ Status add_rows(Block* block, const std::vector<uint32_t>& rows, bool eos)
{
if
(Channel<pipeline::ExchangeSinkLocalState>::_fragment_instance_id.lo == -1) {
return Status::OK();
}
@@ -317,7 +300,7 @@ public:
}
// send _mutable_block
- Status send_current_block(bool eos, Status exec_status) override;
+ Status send_current_block(bool eos, Status exec_status);
void register_exchange_buffer(pipeline::ExchangeSinkBuffer* buffer) {
_buffer = buffer;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]