Gabriel39 commented on code in PR #35682:
URL: https://github.com/apache/doris/pull/35682#discussion_r1621617587


##########
be/src/pipeline/exec/sort_sink_operator.cpp:
##########
@@ -185,4 +186,18 @@ void SortSinkOperatorX::reset(RuntimeState* state) {
     auto& local_state = get_local_state(state);
     local_state._shared_state->sorter->reset();
 }
+
+Status SortSinkOperatorX::build_merger(RuntimeState* state,
+                                       
std::unique_ptr<vectorized::VSortedRunMerger>& merger,
+                                       RuntimeProfile* profile) {
+    vectorized::VSortExecExprs vsort_exec_exprs;
+    RETURN_IF_ERROR(vsort_exec_exprs.init(_sort_info, state->obj_pool()));
+    RETURN_IF_ERROR(vsort_exec_exprs.prepare(state, _child_x->row_desc(), 
_row_descriptor));
+    RETURN_IF_ERROR(vsort_exec_exprs.open(state));

Review Comment:
   `open` should be called in local state 's open function



##########
be/src/pipeline/local_exchange/local_exchanger.cpp:
##########
@@ -243,6 +245,59 @@ Status PassToOneExchanger::get_block(RuntimeState* state, 
vectorized::Block* blo
     return Status::OK();
 }
 
+Status LocalMergeSortExchanger::sink(RuntimeState* state, vectorized::Block* 
in_block, bool eos,
+                                     LocalExchangeSinkLocalState& local_state) 
{
+    vectorized::Block new_block(in_block->clone_empty());
+    new_block.swap(*in_block);
+    
_data_queue[state->per_fragment_instance_idx()].enqueue(std::move(new_block));
+    local_state._shared_state->set_ready_to_read(0);
+    return Status::OK();
+}
+
+Status LocalMergeSortExchanger::build_merger(SortSourceOperatorX* sort_source,
+                                             RuntimeState* state) {
+    auto* sort_sink = sort_source->sort_sink();
+    if (!sort_sink) {
+        return Status::InternalError("can not get sort_sink");
+    }
+
+    RETURN_IF_ERROR(sort_sink->build_merger(state, _merger, &_fake_profile));

Review Comment:
   Move `build_merger` into `SortSourceOperatorX`



##########
be/src/pipeline/exec/sort_source_operator.h:
##########
@@ -46,10 +48,16 @@ class SortSourceOperatorX final : public 
OperatorX<SortLocalState> {
 
     bool is_source() const override { return true; }
 
+    bool merge_by_exchange() const { return _merge_by_exchange; }
     const vectorized::SortDescription& get_sort_description(RuntimeState* 
state) const;
 
+    void set_sort_sink(SortSinkOperatorX* sort_sink) { _sort_sink = sort_sink; 
}
+    SortSinkOperatorX* sort_sink() { return _sort_sink; }
+
 private:
     friend class SortLocalState;
+    const bool _merge_by_exchange;
+    SortSinkOperatorX* _sort_sink = nullptr;

Review Comment:
   we should not keep a `_sort_sink` here



##########
be/src/pipeline/local_exchange/local_exchanger.cpp:
##########
@@ -243,6 +245,59 @@ Status PassToOneExchanger::get_block(RuntimeState* state, 
vectorized::Block* blo
     return Status::OK();
 }
 
+Status LocalMergeSortExchanger::sink(RuntimeState* state, vectorized::Block* 
in_block, bool eos,
+                                     LocalExchangeSinkLocalState& local_state) 
{
+    vectorized::Block new_block(in_block->clone_empty());
+    new_block.swap(*in_block);
+    
_data_queue[state->per_fragment_instance_idx()].enqueue(std::move(new_block));
+    local_state._shared_state->set_ready_to_read(0);
+    return Status::OK();
+}
+
+Status LocalMergeSortExchanger::build_merger(SortSourceOperatorX* sort_source,
+                                             RuntimeState* state) {
+    auto* sort_sink = sort_source->sort_sink();
+    if (!sort_sink) {
+        return Status::InternalError("can not get sort_sink");
+    }
+
+    RETURN_IF_ERROR(sort_sink->build_merger(state, _merger, &_fake_profile));
+    for (int channel_id = 0; channel_id < _num_partitions; channel_id++) {
+        vectorized::BlockSupplier block_supplier = [&, id = 
channel_id](vectorized::Block* block,
+                                                                        bool* 
eos) {
+            vectorized::Block next_block;
+            if (_running_sink_operators == 0) {
+                if (_data_queue[id].try_dequeue(next_block)) {
+                    *block = std::move(next_block);
+                } else {
+                    *eos = true;
+                }
+            } else if (_data_queue[id].try_dequeue(next_block)) {
+                *block = std::move(next_block);
+            }
+            return Status::OK();
+        };
+        _child_block_suppliers.push_back(block_supplier);
+    }
+    return Status::OK();
+}
+
+Status LocalMergeSortExchanger::get_block(RuntimeState* state, 
vectorized::Block* block, bool* eos,
+                                          LocalExchangeSourceLocalState& 
local_state) {
+    if (state->per_fragment_instance_idx() != 0) {

Review Comment:
   local_state._channel_id



##########
be/src/pipeline/local_exchange/local_exchanger.cpp:
##########
@@ -243,6 +245,59 @@ Status PassToOneExchanger::get_block(RuntimeState* state, 
vectorized::Block* blo
     return Status::OK();
 }
 
+Status LocalMergeSortExchanger::sink(RuntimeState* state, vectorized::Block* 
in_block, bool eos,
+                                     LocalExchangeSinkLocalState& local_state) 
{
+    vectorized::Block new_block(in_block->clone_empty());
+    new_block.swap(*in_block);
+    
_data_queue[state->per_fragment_instance_idx()].enqueue(std::move(new_block));
+    local_state._shared_state->set_ready_to_read(0);
+    return Status::OK();
+}
+
+Status LocalMergeSortExchanger::build_merger(SortSourceOperatorX* sort_source,
+                                             RuntimeState* state) {
+    auto* sort_sink = sort_source->sort_sink();
+    if (!sort_sink) {
+        return Status::InternalError("can not get sort_sink");
+    }
+
+    RETURN_IF_ERROR(sort_sink->build_merger(state, _merger, &_fake_profile));

Review Comment:
   Why we need a _fake_profile?



##########
be/src/pipeline/exec/exchange_sink_operator.cpp:
##########
@@ -664,4 +666,14 @@ Status ExchangeSinkLocalState::close(RuntimeState* state, 
Status exec_status) {
     return Base::close(state, exec_status);
 }
 
+DataDistribution ExchangeSinkOperatorX::required_data_distribution() const {
+    if (_child_x) {
+        if (auto sort_source = 
std::dynamic_pointer_cast<SortSourceOperatorX>(_child_x);

Review Comment:
   Also need a session variable to control



##########
be/src/pipeline/local_exchange/local_exchanger.cpp:
##########
@@ -243,6 +245,59 @@ Status PassToOneExchanger::get_block(RuntimeState* state, 
vectorized::Block* blo
     return Status::OK();
 }
 
+Status LocalMergeSortExchanger::sink(RuntimeState* state, vectorized::Block* 
in_block, bool eos,
+                                     LocalExchangeSinkLocalState& local_state) 
{
+    vectorized::Block new_block(in_block->clone_empty());
+    new_block.swap(*in_block);
+    
_data_queue[state->per_fragment_instance_idx()].enqueue(std::move(new_block));
+    local_state._shared_state->set_ready_to_read(0);
+    return Status::OK();
+}
+
+Status LocalMergeSortExchanger::build_merger(SortSourceOperatorX* sort_source,
+                                             RuntimeState* state) {
+    auto* sort_sink = sort_source->sort_sink();
+    if (!sort_sink) {
+        return Status::InternalError("can not get sort_sink");
+    }
+
+    RETURN_IF_ERROR(sort_sink->build_merger(state, _merger, &_fake_profile));
+    for (int channel_id = 0; channel_id < _num_partitions; channel_id++) {
+        vectorized::BlockSupplier block_supplier = [&, id = 
channel_id](vectorized::Block* block,
+                                                                        bool* 
eos) {
+            vectorized::Block next_block;
+            if (_running_sink_operators == 0) {
+                if (_data_queue[id].try_dequeue(next_block)) {
+                    *block = std::move(next_block);
+                } else {
+                    *eos = true;
+                }
+            } else if (_data_queue[id].try_dequeue(next_block)) {
+                *block = std::move(next_block);
+            }
+            return Status::OK();
+        };
+        _child_block_suppliers.push_back(block_supplier);
+    }
+    return Status::OK();
+}
+
+Status LocalMergeSortExchanger::get_block(RuntimeState* state, 
vectorized::Block* block, bool* eos,
+                                          LocalExchangeSourceLocalState& 
local_state) {
+    if (state->per_fragment_instance_idx() != 0) {
+        *eos = true;
+        return Status::OK();
+    }
+    DCHECK(_merger);
+    if (!_has_prepare) {
+        RETURN_IF_ERROR(_merger->prepare(_child_block_suppliers));
+        _merger->set_pipeline_engine_enabled(true);
+        _has_prepare = true;
+    }
+    RETURN_IF_ERROR(_merger->get_next(block, eos));

Review Comment:
   `block` should be push to free blocks for mem reusing



##########
be/src/pipeline/local_exchange/local_exchanger.cpp:
##########
@@ -243,6 +245,59 @@ Status PassToOneExchanger::get_block(RuntimeState* state, 
vectorized::Block* blo
     return Status::OK();
 }
 
+Status LocalMergeSortExchanger::sink(RuntimeState* state, vectorized::Block* 
in_block, bool eos,
+                                     LocalExchangeSinkLocalState& local_state) 
{
+    vectorized::Block new_block(in_block->clone_empty());
+    new_block.swap(*in_block);
+    
_data_queue[state->per_fragment_instance_idx()].enqueue(std::move(new_block));

Review Comment:
   Use `local_state._channel_id` to replace `state->per_fragment_instance_idx()`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to