This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 5a1c7f6314 [improvement](analytic) improve memory counter (#14890)
5a1c7f6314 is described below

commit 5a1c7f6314fef589bfc078ca4253a19d2fff87f9
Author: TengJianPing <18241664+jackte...@users.noreply.github.com>
AuthorDate: Fri Dec 9 14:13:17 2022 +0800

    [improvement](analytic) improve memory counter (#14890)
---
 be/src/vec/exec/vanalytic_eval_node.cpp | 32 +++++++++++++++++---------------
 be/src/vec/exec/vanalytic_eval_node.h   |  9 ++++-----
 2 files changed, 21 insertions(+), 20 deletions(-)

diff --git a/be/src/vec/exec/vanalytic_eval_node.cpp 
b/be/src/vec/exec/vanalytic_eval_node.cpp
index ed1ac17767..5fa25cb1c6 100644
--- a/be/src/vec/exec/vanalytic_eval_node.cpp
+++ b/be/src/vec/exec/vanalytic_eval_node.cpp
@@ -287,8 +287,8 @@ Status 
VAnalyticEvalNode::_get_next_for_partition(RuntimeState* state, Block* bl
         SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
         size_t current_block_rows = _input_blocks[_output_block_index].rows();
         if (next_partition) {
-            _executor.execute(_partition_by_start, _partition_by_end, 
_partition_by_start,
-                              _partition_by_end);
+            _executor.execute(_partition_by_start.pos, _partition_by_end.pos,
+                              _partition_by_start.pos, _partition_by_end.pos);
         }
         _executor.insert_result(current_block_rows);
         if (_window_end_position == current_block_rows) {
@@ -312,7 +312,8 @@ Status VAnalyticEvalNode::_get_next_for_range(RuntimeState* 
state, Block* block,
                _window_end_position < current_block_rows) {
             if (_current_row_position >= _order_by_end.pos) {
                 _update_order_by_range();
-                _executor.execute(_order_by_start, _order_by_end, 
_order_by_start, _order_by_end);
+                _executor.execute(_order_by_start.pos, _order_by_end.pos, 
_order_by_start.pos,
+                                  _order_by_end.pos);
             }
             _executor.insert_result(current_block_rows);
         }
@@ -335,25 +336,26 @@ Status 
VAnalyticEvalNode::_get_next_for_rows(RuntimeState* state, Block* block,
         size_t current_block_rows = _input_blocks[_output_block_index].rows();
         while (_current_row_position < _partition_by_end.pos &&
                _window_end_position < current_block_rows) {
-            BlockRowPos range_start, range_end;
+            int64_t range_start, range_end;
             if (!_window.__isset.window_start &&
                 _window.window_end.type ==
                         TAnalyticWindowBoundaryType::
                                 CURRENT_ROW) { //[preceding, 
current_row],[current_row, following]
-                range_start.pos = _current_row_position;
-                range_end.pos = _current_row_position +
-                                1; //going on calculate,add up data, no need 
to reset state
+                range_start = _current_row_position;
+                range_end = _current_row_position +
+                            1; //going on calculate,add up data, no need to 
reset state
             } else {
                 _reset_agg_status();
                 if (!_window.__isset
                              .window_start) { //[preceding, offset]        
--unbound: [preceding, following]
-                    range_start.pos = _partition_by_start.pos;
+                    range_start = _partition_by_start.pos;
                 } else {
-                    range_start.pos = _current_row_position + 
_rows_start_offset;
+                    range_start = _current_row_position + _rows_start_offset;
                 }
-                range_end.pos = _current_row_position + _rows_end_offset + 1;
+                range_end = _current_row_position + _rows_end_offset + 1;
             }
-            _executor.execute(_partition_by_start, _partition_by_end, 
range_start, range_end);
+            _executor.execute(_partition_by_start.pos, _partition_by_end.pos, 
range_start,
+                              range_end);
             _executor.insert_result(current_block_rows);
         }
         if (_window_end_position == current_block_rows) {
@@ -595,6 +597,7 @@ Status VAnalyticEvalNode::_output_current_block(Block* 
block) {
 
     block->swap(std::move(_input_blocks[_output_block_index]));
     _blocks_memory_usage->add(-block->allocated_bytes());
+    mem_tracker_held()->consume(-block->allocated_bytes());
     if (_origin_cols.size() < block->columns()) {
         block->erase_not_in(_origin_cols);
     }
@@ -618,16 +621,15 @@ Status VAnalyticEvalNode::_output_current_block(Block* 
block) {
 
 //now is execute for lead/lag row_number/rank/dense_rank/ntile functions
 //sum min max count avg first_value last_value functions
-void VAnalyticEvalNode::_execute_for_win_func(BlockRowPos partition_start,
-                                              BlockRowPos partition_end, 
BlockRowPos frame_start,
-                                              BlockRowPos frame_end) {
+void VAnalyticEvalNode::_execute_for_win_func(int64_t partition_start, int64_t 
partition_end,
+                                              int64_t frame_start, int64_t 
frame_end) {
     for (size_t i = 0; i < _agg_functions_size; ++i) {
         std::vector<const IColumn*> _agg_columns;
         for (int j = 0; j < _agg_intput_columns[i].size(); ++j) {
             _agg_columns.push_back(_agg_intput_columns[i][j].get());
         }
         _agg_functions[i]->function()->add_range_single_place(
-                partition_start.pos, partition_end.pos, frame_start.pos, 
frame_end.pos,
+                partition_start, partition_end, frame_start, frame_end,
                 _fn_place_ptr + _offsets_of_aggregate_states[i], 
_agg_columns.data(), nullptr);
     }
 }
diff --git a/be/src/vec/exec/vanalytic_eval_node.h 
b/be/src/vec/exec/vanalytic_eval_node.h
index c39340e94d..a6824a108a 100644
--- a/be/src/vec/exec/vanalytic_eval_node.h
+++ b/be/src/vec/exec/vanalytic_eval_node.h
@@ -58,8 +58,8 @@ private:
     Status _get_next_for_range(RuntimeState* state, Block* block, bool* eos);
     Status _get_next_for_partition(RuntimeState* state, Block* block, bool* 
eos);
 
-    void _execute_for_win_func(BlockRowPos partition_start, BlockRowPos 
partition_end,
-                               BlockRowPos frame_start, BlockRowPos frame_end);
+    void _execute_for_win_func(int64_t partition_start, int64_t partition_end, 
int64_t frame_start,
+                               int64_t frame_end);
 
     Status _reset_agg_status();
     Status _init_result_columns();
@@ -80,9 +80,8 @@ private:
     bool whether_need_next_partition(BlockRowPos found_partition_end);
 
     std::string debug_window_bound_string(TAnalyticWindowBoundary b);
-    using vectorized_execute =
-            std::function<void(BlockRowPos peer_group_start, BlockRowPos 
peer_group_end,
-                               BlockRowPos frame_start, BlockRowPos 
frame_end)>;
+    using vectorized_execute = std::function<void(int64_t peer_group_start, 
int64_t peer_group_end,
+                                                  int64_t frame_start, int64_t 
frame_end)>;
     using vectorized_get_next = std::function<Status(RuntimeState* state, 
Block* block, bool* eos)>;
     using vectorized_get_result = std::function<void(int64_t 
current_block_rows)>;
     using vectorized_closer = std::function<void()>;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to