This is an automated email from the ASF dual-hosted git repository.

mrhhsg pushed a commit to branch spill_and_reserve305
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/spill_and_reserve305 by this 
push:
     new db7b3a681f2 [refactor](branch-30) improve the execute performance 
about analytic operator #48681
db7b3a681f2 is described below

commit db7b3a681f2ff13e2cae7dca175d73333eeb2fc6
Author: Hu Shenggang <hushengg...@selectdb.com>
AuthorDate: Tue Apr 8 18:42:57 2025 +0800

    [refactor](branch-30) improve the execute performance about analytic 
operator #48681
---
 be/src/pipeline/exec/analytic_sink_operator.cpp   | 10 +++----
 be/src/pipeline/exec/analytic_source_operator.cpp | 32 +++++++++++------------
 be/src/pipeline/exec/analytic_source_operator.h   |  2 ++
 3 files changed, 21 insertions(+), 23 deletions(-)

diff --git a/be/src/pipeline/exec/analytic_sink_operator.cpp 
b/be/src/pipeline/exec/analytic_sink_operator.cpp
index 549b080e910..b3d1ceaa8ff 100644
--- a/be/src/pipeline/exec/analytic_sink_operator.cpp
+++ b/be/src/pipeline/exec/analytic_sink_operator.cpp
@@ -287,12 +287,8 @@ Status AnalyticSinkOperatorX::sink(doris::RuntimeState* 
state, vectorized::Block
     local_state._shared_state->all_block_end.row_num = block_rows;
     local_state._shared_state->all_block_end.pos = 
local_state._shared_state->input_total_rows;
 
-    if (local_state._shared_state->origin_cols
-                .empty()) { //record origin columns, maybe be after this, 
could cast some column but no need to save
-        for (int c = 0; c < input_block->columns(); ++c) {
-            local_state._shared_state->origin_cols.emplace_back(c);
-        }
-    }
+    // record origin columns, maybe be after this, could cast some column but 
no need to output
+    auto column_to_keep = input_block->columns();
 
     {
         SCOPED_TIMER(local_state._compute_agg_data_timer);
@@ -327,6 +323,8 @@ Status AnalyticSinkOperatorX::sink(doris::RuntimeState* 
state, vectorized::Block
         }
     }
 
+    vectorized::Block::erase_useless_column(input_block, column_to_keep);
+
     COUNTER_UPDATE(local_state._memory_used_counter, 
input_block->allocated_bytes());
 
     //TODO: if need improvement, the is a tips to maintain a free queue,
diff --git a/be/src/pipeline/exec/analytic_source_operator.cpp 
b/be/src/pipeline/exec/analytic_source_operator.cpp
index cef0dedc8c1..4c455b94c9c 100644
--- a/be/src/pipeline/exec/analytic_source_operator.cpp
+++ b/be/src/pipeline/exec/analytic_source_operator.cpp
@@ -179,10 +179,16 @@ Status AnalyticLocalState::open(RuntimeState* state) {
 
     auto& p = _parent->cast<AnalyticSourceOperatorX>();
     _agg_functions_size = p._agg_functions.size();
+    _offsets_of_aggregate_states.resize(_agg_functions_size);
+    _result_column_nullable_flags.resize(_agg_functions_size);
 
     _agg_functions.resize(p._agg_functions.size());
     for (size_t i = 0; i < _agg_functions.size(); i++) {
         _agg_functions[i] = p._agg_functions[i]->clone(state, 
state->obj_pool());
+        _offsets_of_aggregate_states[i] = p._offsets_of_aggregate_states[i];
+        _result_column_nullable_flags[i] =
+                
!_agg_functions[i]->function()->get_return_type()->is_nullable() &&
+                _agg_functions[i]->data_type()->is_nullable();
     }
 
     _fn_place_ptr = 
_agg_arena_pool->aligned_alloc(p._total_size_of_aggregate_states,
@@ -282,7 +288,6 @@ void AnalyticLocalState::_destroy_agg_status() {
 
 void AnalyticLocalState::_execute_for_win_func(int64_t partition_start, 
int64_t partition_end,
                                                int64_t frame_start, int64_t 
frame_end) {
-    SCOPED_TIMER(_execute_timer);
     for (size_t i = 0; i < _agg_functions_size; ++i) {
         std::vector<const vectorized::IColumn*> agg_columns;
         for (int j = 0; j < _shared_state->agg_input_columns[i].size(); ++j) {
@@ -290,9 +295,8 @@ void AnalyticLocalState::_execute_for_win_func(int64_t 
partition_start, int64_t
         }
         _agg_functions[i]->function()->add_range_single_place(
                 partition_start, partition_end, frame_start, frame_end,
-                _fn_place_ptr +
-                        
_parent->cast<AnalyticSourceOperatorX>()._offsets_of_aggregate_states[i],
-                agg_columns.data(), _agg_arena_pool.get());
+                _fn_place_ptr + _offsets_of_aggregate_states[i], 
agg_columns.data(),
+                _agg_arena_pool.get());
 
         // If the end is not greater than the start, the current window should 
be empty.
         _current_window_empty =
@@ -301,7 +305,6 @@ void AnalyticLocalState::_execute_for_win_func(int64_t 
partition_start, int64_t
 }
 
 void AnalyticLocalState::_insert_result_info(int64_t current_block_rows) {
-    SCOPED_TIMER(_get_result_timer);
     int64_t current_block_row_pos =
             
_shared_state->input_block_first_row_positions[_output_block_index];
     int64_t get_result_start = _shared_state->current_row_position - 
current_block_row_pos;
@@ -321,12 +324,9 @@ void AnalyticLocalState::_insert_result_info(int64_t 
current_block_rows) {
         _shared_state->current_row_position++;
     }
 
-    const auto& offsets_of_aggregate_states =
-            
_parent->cast<AnalyticSourceOperatorX>()._offsets_of_aggregate_states;
     for (size_t i = 0; i < _agg_functions_size; ++i) {
         for (size_t j = get_result_start; j < _window_end_position; ++j) {
-            if 
(!_agg_functions[i]->function()->get_return_type()->is_nullable() &&
-                _result_window_columns[i]->is_nullable()) {
+            if (_result_column_nullable_flags[i]) {
                 if (_current_window_empty) {
                     _result_window_columns[i]->insert_default();
                 } else {
@@ -334,13 +334,14 @@ void AnalyticLocalState::_insert_result_info(int64_t 
current_block_rows) {
                             _result_window_columns[i].get());
                     dst->get_null_map_data().push_back(0);
                     _agg_functions[i]->insert_result_info(
-                            _fn_place_ptr + offsets_of_aggregate_states[i],
+                            _fn_place_ptr + _offsets_of_aggregate_states[i],
                             &dst->get_nested_column());
                 }
-                continue;
+            } else {
+                _agg_functions[i]->insert_result_info(
+                        _fn_place_ptr + _offsets_of_aggregate_states[i],
+                        _result_window_columns[i].get());
             }
-            _agg_functions[i]->insert_result_info(_fn_place_ptr + 
offsets_of_aggregate_states[i],
-                                                  
_result_window_columns[i].get());
         }
     }
 }
@@ -447,10 +448,7 @@ bool AnalyticLocalState::init_next_partition(BlockRowPos 
found_partition_end) {
 
 Status AnalyticLocalState::output_current_block(vectorized::Block* block) {
     block->swap(std::move(_shared_state->input_blocks[_output_block_index]));
-    _blocks_memory_usage->add(-block->allocated_bytes());
-    if (_shared_state->origin_cols.size() < block->columns()) {
-        block->erase_not_in(_shared_state->origin_cols);
-    }
+    _blocks_memory_usage->add(block->allocated_bytes());
 
     
DCHECK(_parent->cast<AnalyticSourceOperatorX>()._change_to_nullable_flags.size()
 ==
            _result_window_columns.size());
diff --git a/be/src/pipeline/exec/analytic_source_operator.h 
b/be/src/pipeline/exec/analytic_source_operator.h
index 56c664cec61..639a27ffb7c 100644
--- a/be/src/pipeline/exec/analytic_source_operator.h
+++ b/be/src/pipeline/exec/analytic_source_operator.h
@@ -94,6 +94,8 @@ private:
     BlockRowPos _partition_by_start;
     std::unique_ptr<vectorized::Arena> _agg_arena_pool;
     std::vector<vectorized::AggFnEvaluator*> _agg_functions;
+    std::vector<size_t> _offsets_of_aggregate_states;
+    std::vector<bool> _result_column_nullable_flags;
 
     RuntimeProfile::Counter* _evaluation_timer = nullptr;
     RuntimeProfile::Counter* _execute_timer = nullptr;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to