HappenLee commented on code in PR #46181:
URL: https://github.com/apache/doris/pull/46181#discussion_r1912875247
##########
be/src/pipeline/exec/analytic_sink_operator.cpp:
##########
@@ -266,73 +714,133 @@ Status AnalyticSinkOperatorX::sink(doris::RuntimeState*
state, vectorized::Block
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(),
(int64_t)input_block->rows());
- local_state._shared_state->input_eos = eos;
- if (local_state._shared_state->input_eos && input_block->rows() == 0) {
- local_state._dependency->set_ready_to_read();
- local_state._dependency->block();
- return Status::OK();
+ local_state._input_eos = eos;
+ local_state._remove_unused_rows();
+ RETURN_IF_ERROR(_add_input_block(state, input_block));
+ RETURN_IF_ERROR(local_state._execute_impl());
+ if (local_state._input_eos) {
+ std::unique_lock<std::mutex>
lc(local_state._shared_state->sink_eos_lock);
+ local_state._shared_state->sink_eos = true;
+ local_state._dependency->set_ready_to_read(); // ready for source to
read
}
+ return Status::OK();
+}
- local_state._shared_state->input_block_first_row_positions.emplace_back(
- local_state._shared_state->input_total_rows);
+Status AnalyticSinkOperatorX::_add_input_block(doris::RuntimeState* state,
+ vectorized::Block* input_block)
{
+ if (input_block->rows() <= 0) {
+ return Status::OK();
+ }
+ auto& local_state = get_local_state(state);
+
local_state._input_block_first_row_positions.emplace_back(local_state._input_total_rows);
size_t block_rows = input_block->rows();
- local_state._shared_state->input_total_rows += block_rows;
- local_state._shared_state->all_block_end.block_num =
- local_state._shared_state->input_blocks.size();
- local_state._shared_state->all_block_end.row_num = block_rows;
- local_state._shared_state->all_block_end.pos =
local_state._shared_state->input_total_rows;
-
- if (local_state._shared_state->origin_cols
- .empty()) { //record origin columns, maybe be after this,
could cast some column but no need to save
+ local_state._input_total_rows += block_rows;
+
+ // record origin columns, maybe be after this, could cast some column but
no need to output
+ if (local_state._input_col_ids.empty()) {
for (int c = 0; c < input_block->columns(); ++c) {
- local_state._shared_state->origin_cols.emplace_back(c);
+ local_state._input_col_ids.emplace_back(c);
}
}
{
SCOPED_TIMER(local_state._compute_agg_data_timer);
- for (size_t i = 0; i < _agg_functions_size;
- ++i) { //insert _agg_input_columns, execute calculate for its
+ //insert _agg_input_columns, execute calculate for its, and those
columns maybe could remove have used data
+ for (size_t i = 0; i < _agg_functions_size; ++i) {
for (size_t j = 0; j < local_state._agg_expr_ctxs[i].size(); ++j) {
- RETURN_IF_ERROR(_insert_range_column(
- input_block, local_state._agg_expr_ctxs[i][j],
-
local_state._shared_state->agg_input_columns[i][j].get(), block_rows));
+ RETURN_IF_ERROR(_insert_range_column(input_block,
local_state._agg_expr_ctxs[i][j],
+
local_state._agg_input_columns[i][j].get(),
+ block_rows));
}
}
}
{
SCOPED_TIMER(local_state._compute_partition_by_timer);
for (size_t i = 0; i < local_state._partition_by_eq_expr_ctxs.size();
++i) {
- int result_col_id = -1;
-
RETURN_IF_ERROR(local_state._partition_by_eq_expr_ctxs[i]->execute(input_block,
-
&result_col_id));
- DCHECK_GE(result_col_id, 0);
- local_state._shared_state->partition_by_column_idxs[i] =
result_col_id;
+ RETURN_IF_ERROR(
+ _insert_range_column(input_block,
local_state._partition_by_eq_expr_ctxs[i],
+
local_state._partition_by_columns[i].get(), block_rows));
}
}
{
SCOPED_TIMER(local_state._compute_order_by_timer);
for (size_t i = 0; i < local_state._order_by_eq_expr_ctxs.size(); ++i)
{
- int result_col_id = -1;
- RETURN_IF_ERROR(
-
local_state._order_by_eq_expr_ctxs[i]->execute(input_block, &result_col_id));
- DCHECK_GE(result_col_id, 0);
- local_state._shared_state->ordey_by_column_idxs[i] = result_col_id;
+ RETURN_IF_ERROR(_insert_range_column(input_block,
local_state._order_by_eq_expr_ctxs[i],
+
local_state._order_by_columns[i].get(),
+ block_rows));
+ }
+ }
+
+ {
+ SCOPED_TIMER(local_state._compute_order_by_function_timer);
+ // should change the order by exprs to range column, IF FE have
support range window
+ for (size_t i = 0; i < local_state._order_by_eq_expr_ctxs.size(); ++i)
{
Review Comment:
why here do the same expr twice
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]