HappenLee commented on code in PR #46181:
URL: https://github.com/apache/doris/pull/46181#discussion_r1912699700


##########
be/src/pipeline/exec/analytic_sink_operator.cpp:
##########
@@ -42,180 +110,521 @@ Status AnalyticSinkLocalState::open(RuntimeState* state) 
{
     SCOPED_TIMER(exec_time_counter());
     SCOPED_TIMER(_open_timer);
     auto& p = _parent->cast<AnalyticSinkOperatorX>();
-    
_shared_state->partition_by_column_idxs.resize(p._partition_by_eq_expr_ctxs.size());
-    
_shared_state->ordey_by_column_idxs.resize(p._order_by_eq_expr_ctxs.size());
-
-    size_t agg_size = p._agg_expr_ctxs.size();
-    _agg_expr_ctxs.resize(agg_size);
-    _shared_state->agg_input_columns.resize(agg_size);
-    for (int i = 0; i < agg_size; ++i) {
-        _shared_state->agg_input_columns[i].resize(p._num_agg_input[i]);
+
+    _agg_functions_size = p._agg_functions_size;
+    _agg_expr_ctxs.resize(_agg_functions_size);
+    _agg_functions.resize(_agg_functions_size);
+    _agg_input_columns.resize(_agg_functions_size);
+    _offsets_of_aggregate_states.resize(_agg_functions_size);
+    _result_column_nullable_flags.resize(_agg_functions_size);
+
+    for (int i = 0; i < _agg_functions_size; ++i) {
+        _agg_functions[i] = p._agg_functions[i]->clone(state, 
state->obj_pool());
+        _agg_input_columns[i].resize(p._num_agg_input[i]);
         _agg_expr_ctxs[i].resize(p._agg_expr_ctxs[i].size());
         for (int j = 0; j < p._agg_expr_ctxs[i].size(); ++j) {
             RETURN_IF_ERROR(p._agg_expr_ctxs[i][j]->clone(state, 
_agg_expr_ctxs[i][j]));
+            _agg_input_columns[i][j] = 
_agg_expr_ctxs[i][j]->root()->data_type()->create_column();
         }
-
-        for (size_t j = 0; j < _agg_expr_ctxs[i].size(); ++j) {
-            _shared_state->agg_input_columns[i][j] =
-                    _agg_expr_ctxs[i][j]->root()->data_type()->create_column();
-        }
+        _offsets_of_aggregate_states[i] = p._offsets_of_aggregate_states[i];
+        _result_column_nullable_flags[i] =
+                
!_agg_functions[i]->function()->get_return_type()->is_nullable() &&
+                _agg_functions[i]->data_type()->is_nullable();
     }
-    _partition_by_eq_expr_ctxs.resize(p._partition_by_eq_expr_ctxs.size());
-    for (size_t i = 0; i < _partition_by_eq_expr_ctxs.size(); i++) {
+
+    _partition_exprs_size = p._partition_by_eq_expr_ctxs.size();
+    _partition_by_eq_expr_ctxs.resize(_partition_exprs_size);
+    _partition_by_columns.resize(_partition_exprs_size);
+    for (size_t i = 0; i < _partition_exprs_size; i++) {
         RETURN_IF_ERROR(
                 p._partition_by_eq_expr_ctxs[i]->clone(state, 
_partition_by_eq_expr_ctxs[i]));
+        _partition_by_columns[i] =
+                
_partition_by_eq_expr_ctxs[i]->root()->data_type()->create_column();
     }
-    _order_by_eq_expr_ctxs.resize(p._order_by_eq_expr_ctxs.size());
-    for (size_t i = 0; i < _order_by_eq_expr_ctxs.size(); i++) {
+
+    _order_by_exprs_size = p._order_by_eq_expr_ctxs.size();
+    _order_by_eq_expr_ctxs.resize(_order_by_exprs_size);
+    _order_by_columns.resize(_order_by_exprs_size);
+    for (size_t i = 0; i < _order_by_exprs_size; i++) {
         RETURN_IF_ERROR(p._order_by_eq_expr_ctxs[i]->clone(state, 
_order_by_eq_expr_ctxs[i]));
+        _order_by_columns[i] = 
_order_by_eq_expr_ctxs[i]->root()->data_type()->create_column();
+    }
+
+    // only support one order by column, so need two columns upper and lower 
bound
+    // _range_result_columns.resize(2);
+    _range_result_columns.resize(_order_by_exprs_size);
+    // should change the order by exprs to range column, IF FE have support 
range window
+    for (size_t i = 0; i < _order_by_exprs_size; i++) {
+        // RETURN_IF_ERROR(p._order_by_eq_expr_ctxs[i]->clone(state, 
_order_by_eq_expr_ctxs[i]));
+        _range_result_columns[i] = 
_order_by_eq_expr_ctxs[i]->root()->data_type()->create_column();
     }
+
+    _fn_place_ptr = 
_agg_arena_pool->aligned_alloc(p._total_size_of_aggregate_states,
+                                                   p._align_aggregate_states);
+    _create_agg_status();
     return Status::OK();
 }
 
-bool AnalyticSinkLocalState::_whether_need_next_partition(BlockRowPos& 
found_partition_end) {
-    auto& shared_state = *_shared_state;
-    if (shared_state.input_eos ||
-        (shared_state.current_row_position <
-         shared_state.partition_by_end.pos)) { //now still have partition data
-        return false;
+Status AnalyticSinkLocalState::close(RuntimeState* state, Status exec_status) {
+    SCOPED_TIMER(exec_time_counter());
+    SCOPED_TIMER(_close_timer);
+    if (_closed) {
+        return Status::OK();
+    }
+
+    _destroy_agg_status();
+    _agg_arena_pool = nullptr;
+
+    std::vector<vectorized::MutableColumnPtr> tmp_result_window_columns;

Review Comment:
   why not just clear?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to