This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 5a1c7f6314 [improvement](analytic) improve memory counter (#14890) 5a1c7f6314 is described below commit 5a1c7f6314fef589bfc078ca4253a19d2fff87f9 Author: TengJianPing <18241664+jackte...@users.noreply.github.com> AuthorDate: Fri Dec 9 14:13:17 2022 +0800 [improvement](analytic) improve memory counter (#14890) --- be/src/vec/exec/vanalytic_eval_node.cpp | 32 +++++++++++++++++--------------- be/src/vec/exec/vanalytic_eval_node.h | 9 ++++----- 2 files changed, 21 insertions(+), 20 deletions(-) diff --git a/be/src/vec/exec/vanalytic_eval_node.cpp b/be/src/vec/exec/vanalytic_eval_node.cpp index ed1ac17767..5fa25cb1c6 100644 --- a/be/src/vec/exec/vanalytic_eval_node.cpp +++ b/be/src/vec/exec/vanalytic_eval_node.cpp @@ -287,8 +287,8 @@ Status VAnalyticEvalNode::_get_next_for_partition(RuntimeState* state, Block* bl SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh()); size_t current_block_rows = _input_blocks[_output_block_index].rows(); if (next_partition) { - _executor.execute(_partition_by_start, _partition_by_end, _partition_by_start, - _partition_by_end); + _executor.execute(_partition_by_start.pos, _partition_by_end.pos, + _partition_by_start.pos, _partition_by_end.pos); } _executor.insert_result(current_block_rows); if (_window_end_position == current_block_rows) { @@ -312,7 +312,8 @@ Status VAnalyticEvalNode::_get_next_for_range(RuntimeState* state, Block* block, _window_end_position < current_block_rows) { if (_current_row_position >= _order_by_end.pos) { _update_order_by_range(); - _executor.execute(_order_by_start, _order_by_end, _order_by_start, _order_by_end); + _executor.execute(_order_by_start.pos, _order_by_end.pos, _order_by_start.pos, + _order_by_end.pos); } _executor.insert_result(current_block_rows); } @@ -335,25 +336,26 @@ Status VAnalyticEvalNode::_get_next_for_rows(RuntimeState* state, Block* block, size_t current_block_rows = _input_blocks[_output_block_index].rows(); while (_current_row_position < _partition_by_end.pos && _window_end_position < current_block_rows) { - BlockRowPos range_start, range_end; + int64_t range_start, range_end; if (!_window.__isset.window_start && _window.window_end.type == TAnalyticWindowBoundaryType:: CURRENT_ROW) { //[preceding, current_row],[current_row, following] - range_start.pos = _current_row_position; - range_end.pos = _current_row_position + - 1; //going on calculate,add up data, no need to reset state + range_start = _current_row_position; + range_end = _current_row_position + + 1; //going on calculate,add up data, no need to reset state } else { _reset_agg_status(); if (!_window.__isset .window_start) { //[preceding, offset] --unbound: [preceding, following] - range_start.pos = _partition_by_start.pos; + range_start = _partition_by_start.pos; } else { - range_start.pos = _current_row_position + _rows_start_offset; + range_start = _current_row_position + _rows_start_offset; } - range_end.pos = _current_row_position + _rows_end_offset + 1; + range_end = _current_row_position + _rows_end_offset + 1; } - _executor.execute(_partition_by_start, _partition_by_end, range_start, range_end); + _executor.execute(_partition_by_start.pos, _partition_by_end.pos, range_start, + range_end); _executor.insert_result(current_block_rows); } if (_window_end_position == current_block_rows) { @@ -595,6 +597,7 @@ Status VAnalyticEvalNode::_output_current_block(Block* block) { block->swap(std::move(_input_blocks[_output_block_index])); _blocks_memory_usage->add(-block->allocated_bytes()); + mem_tracker_held()->consume(-block->allocated_bytes()); if (_origin_cols.size() < block->columns()) { block->erase_not_in(_origin_cols); } @@ -618,16 +621,15 @@ Status VAnalyticEvalNode::_output_current_block(Block* block) { //now is execute for lead/lag row_number/rank/dense_rank/ntile functions //sum min max count avg first_value last_value functions -void VAnalyticEvalNode::_execute_for_win_func(BlockRowPos partition_start, - BlockRowPos partition_end, BlockRowPos frame_start, - BlockRowPos frame_end) { +void VAnalyticEvalNode::_execute_for_win_func(int64_t partition_start, int64_t partition_end, + int64_t frame_start, int64_t frame_end) { for (size_t i = 0; i < _agg_functions_size; ++i) { std::vector<const IColumn*> _agg_columns; for (int j = 0; j < _agg_intput_columns[i].size(); ++j) { _agg_columns.push_back(_agg_intput_columns[i][j].get()); } _agg_functions[i]->function()->add_range_single_place( - partition_start.pos, partition_end.pos, frame_start.pos, frame_end.pos, + partition_start, partition_end, frame_start, frame_end, _fn_place_ptr + _offsets_of_aggregate_states[i], _agg_columns.data(), nullptr); } } diff --git a/be/src/vec/exec/vanalytic_eval_node.h b/be/src/vec/exec/vanalytic_eval_node.h index c39340e94d..a6824a108a 100644 --- a/be/src/vec/exec/vanalytic_eval_node.h +++ b/be/src/vec/exec/vanalytic_eval_node.h @@ -58,8 +58,8 @@ private: Status _get_next_for_range(RuntimeState* state, Block* block, bool* eos); Status _get_next_for_partition(RuntimeState* state, Block* block, bool* eos); - void _execute_for_win_func(BlockRowPos partition_start, BlockRowPos partition_end, - BlockRowPos frame_start, BlockRowPos frame_end); + void _execute_for_win_func(int64_t partition_start, int64_t partition_end, int64_t frame_start, + int64_t frame_end); Status _reset_agg_status(); Status _init_result_columns(); @@ -80,9 +80,8 @@ private: bool whether_need_next_partition(BlockRowPos found_partition_end); std::string debug_window_bound_string(TAnalyticWindowBoundary b); - using vectorized_execute = - std::function<void(BlockRowPos peer_group_start, BlockRowPos peer_group_end, - BlockRowPos frame_start, BlockRowPos frame_end)>; + using vectorized_execute = std::function<void(int64_t peer_group_start, int64_t peer_group_end, + int64_t frame_start, int64_t frame_end)>; using vectorized_get_next = std::function<Status(RuntimeState* state, Block* block, bool* eos)>; using vectorized_get_result = std::function<void(int64_t current_block_rows)>; using vectorized_closer = std::function<void()>; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org