Gabriel39 commented on code in PR #35682: URL: https://github.com/apache/doris/pull/35682#discussion_r1621617587
########## be/src/pipeline/exec/sort_sink_operator.cpp: ########## @@ -185,4 +186,18 @@ void SortSinkOperatorX::reset(RuntimeState* state) { auto& local_state = get_local_state(state); local_state._shared_state->sorter->reset(); } + +Status SortSinkOperatorX::build_merger(RuntimeState* state, + std::unique_ptr<vectorized::VSortedRunMerger>& merger, + RuntimeProfile* profile) { + vectorized::VSortExecExprs vsort_exec_exprs; + RETURN_IF_ERROR(vsort_exec_exprs.init(_sort_info, state->obj_pool())); + RETURN_IF_ERROR(vsort_exec_exprs.prepare(state, _child_x->row_desc(), _row_descriptor)); + RETURN_IF_ERROR(vsort_exec_exprs.open(state)); Review Comment: `open` should be called in local state 's open function ########## be/src/pipeline/local_exchange/local_exchanger.cpp: ########## @@ -243,6 +245,59 @@ Status PassToOneExchanger::get_block(RuntimeState* state, vectorized::Block* blo return Status::OK(); } +Status LocalMergeSortExchanger::sink(RuntimeState* state, vectorized::Block* in_block, bool eos, + LocalExchangeSinkLocalState& local_state) { + vectorized::Block new_block(in_block->clone_empty()); + new_block.swap(*in_block); + _data_queue[state->per_fragment_instance_idx()].enqueue(std::move(new_block)); + local_state._shared_state->set_ready_to_read(0); + return Status::OK(); +} + +Status LocalMergeSortExchanger::build_merger(SortSourceOperatorX* sort_source, + RuntimeState* state) { + auto* sort_sink = sort_source->sort_sink(); + if (!sort_sink) { + return Status::InternalError("can not get sort_sink"); + } + + RETURN_IF_ERROR(sort_sink->build_merger(state, _merger, &_fake_profile)); Review Comment: Move `build_merger` into `SortSourceOperatorX` ########## be/src/pipeline/exec/sort_source_operator.h: ########## @@ -46,10 +48,16 @@ class SortSourceOperatorX final : public OperatorX<SortLocalState> { bool is_source() const override { return true; } + bool merge_by_exchange() const { return _merge_by_exchange; } const vectorized::SortDescription& get_sort_description(RuntimeState* state) const; + void set_sort_sink(SortSinkOperatorX* sort_sink) { _sort_sink = sort_sink; } + SortSinkOperatorX* sort_sink() { return _sort_sink; } + private: friend class SortLocalState; + const bool _merge_by_exchange; + SortSinkOperatorX* _sort_sink = nullptr; Review Comment: we should not keep a `_sort_sink` here ########## be/src/pipeline/local_exchange/local_exchanger.cpp: ########## @@ -243,6 +245,59 @@ Status PassToOneExchanger::get_block(RuntimeState* state, vectorized::Block* blo return Status::OK(); } +Status LocalMergeSortExchanger::sink(RuntimeState* state, vectorized::Block* in_block, bool eos, + LocalExchangeSinkLocalState& local_state) { + vectorized::Block new_block(in_block->clone_empty()); + new_block.swap(*in_block); + _data_queue[state->per_fragment_instance_idx()].enqueue(std::move(new_block)); + local_state._shared_state->set_ready_to_read(0); + return Status::OK(); +} + +Status LocalMergeSortExchanger::build_merger(SortSourceOperatorX* sort_source, + RuntimeState* state) { + auto* sort_sink = sort_source->sort_sink(); + if (!sort_sink) { + return Status::InternalError("can not get sort_sink"); + } + + RETURN_IF_ERROR(sort_sink->build_merger(state, _merger, &_fake_profile)); + for (int channel_id = 0; channel_id < _num_partitions; channel_id++) { + vectorized::BlockSupplier block_supplier = [&, id = channel_id](vectorized::Block* block, + bool* eos) { + vectorized::Block next_block; + if (_running_sink_operators == 0) { + if (_data_queue[id].try_dequeue(next_block)) { + *block = std::move(next_block); + } else { + *eos = true; + } + } else if (_data_queue[id].try_dequeue(next_block)) { + *block = std::move(next_block); + } + return Status::OK(); + }; + _child_block_suppliers.push_back(block_supplier); + } + return Status::OK(); +} + +Status LocalMergeSortExchanger::get_block(RuntimeState* state, vectorized::Block* block, bool* eos, + LocalExchangeSourceLocalState& local_state) { + if (state->per_fragment_instance_idx() != 0) { Review Comment: local_state._channel_id ########## be/src/pipeline/local_exchange/local_exchanger.cpp: ########## @@ -243,6 +245,59 @@ Status PassToOneExchanger::get_block(RuntimeState* state, vectorized::Block* blo return Status::OK(); } +Status LocalMergeSortExchanger::sink(RuntimeState* state, vectorized::Block* in_block, bool eos, + LocalExchangeSinkLocalState& local_state) { + vectorized::Block new_block(in_block->clone_empty()); + new_block.swap(*in_block); + _data_queue[state->per_fragment_instance_idx()].enqueue(std::move(new_block)); + local_state._shared_state->set_ready_to_read(0); + return Status::OK(); +} + +Status LocalMergeSortExchanger::build_merger(SortSourceOperatorX* sort_source, + RuntimeState* state) { + auto* sort_sink = sort_source->sort_sink(); + if (!sort_sink) { + return Status::InternalError("can not get sort_sink"); + } + + RETURN_IF_ERROR(sort_sink->build_merger(state, _merger, &_fake_profile)); Review Comment: Why we need a _fake_profile? ########## be/src/pipeline/exec/exchange_sink_operator.cpp: ########## @@ -664,4 +666,14 @@ Status ExchangeSinkLocalState::close(RuntimeState* state, Status exec_status) { return Base::close(state, exec_status); } +DataDistribution ExchangeSinkOperatorX::required_data_distribution() const { + if (_child_x) { + if (auto sort_source = std::dynamic_pointer_cast<SortSourceOperatorX>(_child_x); Review Comment: Also need a session variable to control ########## be/src/pipeline/local_exchange/local_exchanger.cpp: ########## @@ -243,6 +245,59 @@ Status PassToOneExchanger::get_block(RuntimeState* state, vectorized::Block* blo return Status::OK(); } +Status LocalMergeSortExchanger::sink(RuntimeState* state, vectorized::Block* in_block, bool eos, + LocalExchangeSinkLocalState& local_state) { + vectorized::Block new_block(in_block->clone_empty()); + new_block.swap(*in_block); + _data_queue[state->per_fragment_instance_idx()].enqueue(std::move(new_block)); + local_state._shared_state->set_ready_to_read(0); + return Status::OK(); +} + +Status LocalMergeSortExchanger::build_merger(SortSourceOperatorX* sort_source, + RuntimeState* state) { + auto* sort_sink = sort_source->sort_sink(); + if (!sort_sink) { + return Status::InternalError("can not get sort_sink"); + } + + RETURN_IF_ERROR(sort_sink->build_merger(state, _merger, &_fake_profile)); + for (int channel_id = 0; channel_id < _num_partitions; channel_id++) { + vectorized::BlockSupplier block_supplier = [&, id = channel_id](vectorized::Block* block, + bool* eos) { + vectorized::Block next_block; + if (_running_sink_operators == 0) { + if (_data_queue[id].try_dequeue(next_block)) { + *block = std::move(next_block); + } else { + *eos = true; + } + } else if (_data_queue[id].try_dequeue(next_block)) { + *block = std::move(next_block); + } + return Status::OK(); + }; + _child_block_suppliers.push_back(block_supplier); + } + return Status::OK(); +} + +Status LocalMergeSortExchanger::get_block(RuntimeState* state, vectorized::Block* block, bool* eos, + LocalExchangeSourceLocalState& local_state) { + if (state->per_fragment_instance_idx() != 0) { + *eos = true; + return Status::OK(); + } + DCHECK(_merger); + if (!_has_prepare) { + RETURN_IF_ERROR(_merger->prepare(_child_block_suppliers)); + _merger->set_pipeline_engine_enabled(true); + _has_prepare = true; + } + RETURN_IF_ERROR(_merger->get_next(block, eos)); Review Comment: `block` should be push to free blocks for mem reusing ########## be/src/pipeline/local_exchange/local_exchanger.cpp: ########## @@ -243,6 +245,59 @@ Status PassToOneExchanger::get_block(RuntimeState* state, vectorized::Block* blo return Status::OK(); } +Status LocalMergeSortExchanger::sink(RuntimeState* state, vectorized::Block* in_block, bool eos, + LocalExchangeSinkLocalState& local_state) { + vectorized::Block new_block(in_block->clone_empty()); + new_block.swap(*in_block); + _data_queue[state->per_fragment_instance_idx()].enqueue(std::move(new_block)); Review Comment: Use `local_state._channel_id` to replace `state->per_fragment_instance_idx()` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org