This is an automated email from the ASF dual-hosted git repository. mrhhsg pushed a commit to branch spill_and_reserve305 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/spill_and_reserve305 by this push: new db7b3a681f2 [refactor](branch-30) improve the execute performance about analytic operator #48681 db7b3a681f2 is described below commit db7b3a681f2ff13e2cae7dca175d73333eeb2fc6 Author: Hu Shenggang <hushengg...@selectdb.com> AuthorDate: Tue Apr 8 18:42:57 2025 +0800 [refactor](branch-30) improve the execute performance about analytic operator #48681 --- be/src/pipeline/exec/analytic_sink_operator.cpp | 10 +++---- be/src/pipeline/exec/analytic_source_operator.cpp | 32 +++++++++++------------ be/src/pipeline/exec/analytic_source_operator.h | 2 ++ 3 files changed, 21 insertions(+), 23 deletions(-) diff --git a/be/src/pipeline/exec/analytic_sink_operator.cpp b/be/src/pipeline/exec/analytic_sink_operator.cpp index 549b080e910..b3d1ceaa8ff 100644 --- a/be/src/pipeline/exec/analytic_sink_operator.cpp +++ b/be/src/pipeline/exec/analytic_sink_operator.cpp @@ -287,12 +287,8 @@ Status AnalyticSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Block local_state._shared_state->all_block_end.row_num = block_rows; local_state._shared_state->all_block_end.pos = local_state._shared_state->input_total_rows; - if (local_state._shared_state->origin_cols - .empty()) { //record origin columns, maybe be after this, could cast some column but no need to save - for (int c = 0; c < input_block->columns(); ++c) { - local_state._shared_state->origin_cols.emplace_back(c); - } - } + // record origin columns, maybe be after this, could cast some column but no need to output + auto column_to_keep = input_block->columns(); { SCOPED_TIMER(local_state._compute_agg_data_timer); @@ -327,6 +323,8 @@ Status AnalyticSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Block } } + vectorized::Block::erase_useless_column(input_block, column_to_keep); + COUNTER_UPDATE(local_state._memory_used_counter, input_block->allocated_bytes()); //TODO: if need improvement, the is a tips to maintain a free queue, diff --git a/be/src/pipeline/exec/analytic_source_operator.cpp b/be/src/pipeline/exec/analytic_source_operator.cpp index cef0dedc8c1..4c455b94c9c 100644 --- a/be/src/pipeline/exec/analytic_source_operator.cpp +++ b/be/src/pipeline/exec/analytic_source_operator.cpp @@ -179,10 +179,16 @@ Status AnalyticLocalState::open(RuntimeState* state) { auto& p = _parent->cast<AnalyticSourceOperatorX>(); _agg_functions_size = p._agg_functions.size(); + _offsets_of_aggregate_states.resize(_agg_functions_size); + _result_column_nullable_flags.resize(_agg_functions_size); _agg_functions.resize(p._agg_functions.size()); for (size_t i = 0; i < _agg_functions.size(); i++) { _agg_functions[i] = p._agg_functions[i]->clone(state, state->obj_pool()); + _offsets_of_aggregate_states[i] = p._offsets_of_aggregate_states[i]; + _result_column_nullable_flags[i] = + !_agg_functions[i]->function()->get_return_type()->is_nullable() && + _agg_functions[i]->data_type()->is_nullable(); } _fn_place_ptr = _agg_arena_pool->aligned_alloc(p._total_size_of_aggregate_states, @@ -282,7 +288,6 @@ void AnalyticLocalState::_destroy_agg_status() { void AnalyticLocalState::_execute_for_win_func(int64_t partition_start, int64_t partition_end, int64_t frame_start, int64_t frame_end) { - SCOPED_TIMER(_execute_timer); for (size_t i = 0; i < _agg_functions_size; ++i) { std::vector<const vectorized::IColumn*> agg_columns; for (int j = 0; j < _shared_state->agg_input_columns[i].size(); ++j) { @@ -290,9 +295,8 @@ void AnalyticLocalState::_execute_for_win_func(int64_t partition_start, int64_t } _agg_functions[i]->function()->add_range_single_place( partition_start, partition_end, frame_start, frame_end, - _fn_place_ptr + - _parent->cast<AnalyticSourceOperatorX>()._offsets_of_aggregate_states[i], - agg_columns.data(), _agg_arena_pool.get()); + _fn_place_ptr + _offsets_of_aggregate_states[i], agg_columns.data(), + _agg_arena_pool.get()); // If the end is not greater than the start, the current window should be empty. _current_window_empty = @@ -301,7 +305,6 @@ void AnalyticLocalState::_execute_for_win_func(int64_t partition_start, int64_t } void AnalyticLocalState::_insert_result_info(int64_t current_block_rows) { - SCOPED_TIMER(_get_result_timer); int64_t current_block_row_pos = _shared_state->input_block_first_row_positions[_output_block_index]; int64_t get_result_start = _shared_state->current_row_position - current_block_row_pos; @@ -321,12 +324,9 @@ void AnalyticLocalState::_insert_result_info(int64_t current_block_rows) { _shared_state->current_row_position++; } - const auto& offsets_of_aggregate_states = - _parent->cast<AnalyticSourceOperatorX>()._offsets_of_aggregate_states; for (size_t i = 0; i < _agg_functions_size; ++i) { for (size_t j = get_result_start; j < _window_end_position; ++j) { - if (!_agg_functions[i]->function()->get_return_type()->is_nullable() && - _result_window_columns[i]->is_nullable()) { + if (_result_column_nullable_flags[i]) { if (_current_window_empty) { _result_window_columns[i]->insert_default(); } else { @@ -334,13 +334,14 @@ void AnalyticLocalState::_insert_result_info(int64_t current_block_rows) { _result_window_columns[i].get()); dst->get_null_map_data().push_back(0); _agg_functions[i]->insert_result_info( - _fn_place_ptr + offsets_of_aggregate_states[i], + _fn_place_ptr + _offsets_of_aggregate_states[i], &dst->get_nested_column()); } - continue; + } else { + _agg_functions[i]->insert_result_info( + _fn_place_ptr + _offsets_of_aggregate_states[i], + _result_window_columns[i].get()); } - _agg_functions[i]->insert_result_info(_fn_place_ptr + offsets_of_aggregate_states[i], - _result_window_columns[i].get()); } } } @@ -447,10 +448,7 @@ bool AnalyticLocalState::init_next_partition(BlockRowPos found_partition_end) { Status AnalyticLocalState::output_current_block(vectorized::Block* block) { block->swap(std::move(_shared_state->input_blocks[_output_block_index])); - _blocks_memory_usage->add(-block->allocated_bytes()); - if (_shared_state->origin_cols.size() < block->columns()) { - block->erase_not_in(_shared_state->origin_cols); - } + _blocks_memory_usage->add(block->allocated_bytes()); DCHECK(_parent->cast<AnalyticSourceOperatorX>()._change_to_nullable_flags.size() == _result_window_columns.size()); diff --git a/be/src/pipeline/exec/analytic_source_operator.h b/be/src/pipeline/exec/analytic_source_operator.h index 56c664cec61..639a27ffb7c 100644 --- a/be/src/pipeline/exec/analytic_source_operator.h +++ b/be/src/pipeline/exec/analytic_source_operator.h @@ -94,6 +94,8 @@ private: BlockRowPos _partition_by_start; std::unique_ptr<vectorized::Arena> _agg_arena_pool; std::vector<vectorized::AggFnEvaluator*> _agg_functions; + std::vector<size_t> _offsets_of_aggregate_states; + std::vector<bool> _result_column_nullable_flags; RuntimeProfile::Counter* _evaluation_timer = nullptr; RuntimeProfile::Counter* _execute_timer = nullptr; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org