This is an automated email from the ASF dual-hosted git repository.
mrhhsg pushed a commit to branch spill_and_reserve305
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/spill_and_reserve305 by this
push:
new 9bf99e07ebf streaming window function
9bf99e07ebf is described below
commit 9bf99e07ebf9ff66fdd90d2fa0a92c7035a33d74
Author: zhangstar333 <[email protected]>
AuthorDate: Sun Apr 20 08:21:07 2025 +0800
streaming window function
### What problem does this PR solve?
Issue Number: close #xxx
Related PR: #xxx
Problem Summary:
### Release note
None
### Check List (For Author)
- Test <!-- At least one of them must be included. -->
- [ ] Regression test
- [ ] Unit Test
- [ ] Manual test (add detailed scripts or steps below)
- [ ] No need to test or manual test. Explain why:
- [ ] This is a refactor/code format and no logic has been changed.
- [ ] Previous test can cover this change.
- [ ] No code files have been changed.
- [ ] Other reason <!-- Add your reason? -->
- Behavior changed:
- [ ] No.
- [ ] Yes. <!-- Explain the behavior change -->
- Does this need documentation?
- [ ] No.
- [ ] Yes. <!-- Add document PR link here. eg:
https://github.com/apache/doris-website/pull/1214 -->
### Check List (For Reviewer who merge this PR)
- [ ] Confirm the release note
- [ ] Confirm test cases
- [ ] Confirm document
- [ ] Add branch pick label <!-- Add branch pick label that this PR
should merge into -->
---
be/src/common/config.cpp | 1 +
be/src/common/config.h | 1 +
be/src/pipeline/dependency.h | 34 +-
be/src/pipeline/exec/analytic_sink_operator.cpp | 888 +++++++++++++++++----
be/src/pipeline/exec/analytic_sink_operator.h | 171 +++-
be/src/pipeline/exec/analytic_source_operator.cpp | 594 +-------------
be/src/pipeline/exec/analytic_source_operator.h | 102 +--
.../vec/aggregate_functions/aggregate_function.h | 13 +
.../aggregate_function_window.h | 91 ++-
gensrc/thrift/PlanNodes.thrift | 2 +
.../window_functions/test_column_boundary.out | Bin 0 -> 119 bytes
.../window_functions/test_column_boundary.groovy | 54 ++
12 files changed, 1063 insertions(+), 888 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 1359ae51080..ca510308fa6 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -168,6 +168,7 @@ DEFINE_mBool(enable_query_memory_overcommit, "true");
DEFINE_mBool(disable_memory_gc, "false");
DEFINE_mBool(enable_stacktrace, "true");
+DEFINE_mBool(enable_streaming_analytic, "true");
DEFINE_mInt64(stacktrace_in_alloc_large_memory_bytes, "2147483648");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 60a718d1e70..f5af314fa3a 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -207,6 +207,7 @@ DECLARE_mBool(disable_memory_gc);
// if false, turn off all stacktrace
DECLARE_mBool(enable_stacktrace);
+DECLARE_mBool(enable_streaming_analytic);
// when alloc memory larger than stacktrace_in_alloc_large_memory_bytes,
default 2G,
// if alloc successful, will print a warning with stacktrace, but not prevent
memory alloc.
diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h
index e010ad7e6cd..a731d42c5b8 100644
--- a/be/src/pipeline/dependency.h
+++ b/be/src/pipeline/dependency.h
@@ -573,41 +573,15 @@ struct MultiCastSharedState : public BasicSharedState,
void update_spill_stream_profiles(RuntimeProfile* source_profile) override;
};
-struct BlockRowPos {
- int64_t block_num {}; //the pos at which block
- int64_t row_num {}; //the pos at which row
- int64_t pos {}; //pos = all blocks size + row_num
- std::string debug_string() const {
- std::string res = "\t block_num: ";
- res += std::to_string(block_num);
- res += "\t row_num: ";
- res += std::to_string(row_num);
- res += "\t pos: ";
- res += std::to_string(pos);
- return res;
- }
-};
-
struct AnalyticSharedState : public BasicSharedState {
ENABLE_FACTORY_CREATOR(AnalyticSharedState)
public:
AnalyticSharedState() = default;
-
- int64_t current_row_position = 0;
- BlockRowPos partition_by_end;
- int64_t input_total_rows = 0;
- BlockRowPos all_block_end;
- std::vector<vectorized::Block> input_blocks;
- bool input_eos = false;
- BlockRowPos found_partition_end;
- std::vector<int64_t> origin_cols;
- std::vector<int64_t> input_block_first_row_positions;
- std::vector<std::vector<vectorized::MutableColumnPtr>> agg_input_columns;
-
- // TODO: maybe global?
- std::vector<int64_t> partition_by_column_idxs;
- std::vector<int64_t> ordey_by_column_idxs;
+ std::queue<vectorized::Block> blocks_buffer;
+ std::mutex buffer_mutex;
+ bool sink_eos = false;
+ std::mutex sink_eos_lock;
};
struct JoinSharedState : public BasicSharedState {
diff --git a/be/src/pipeline/exec/analytic_sink_operator.cpp
b/be/src/pipeline/exec/analytic_sink_operator.cpp
index b3d1ceaa8ff..16382457f96 100644
--- a/be/src/pipeline/exec/analytic_sink_operator.cpp
+++ b/be/src/pipeline/exec/analytic_sink_operator.cpp
@@ -18,6 +18,10 @@
#include "analytic_sink_operator.h"
+#include <glog/logging.h>
+
+#include <cstddef>
+#include <cstdint>
#include <string>
#include "pipeline/exec/operator.h"
@@ -31,10 +35,77 @@ Status AnalyticSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo& inf
RETURN_IF_ERROR(PipelineXSinkLocalState<AnalyticSharedState>::init(state,
info));
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_init_timer);
- _evaluation_timer = ADD_TIMER(profile(), "GetPartitionBoundTime");
+ _evaluation_timer = ADD_TIMER(profile(), "EvaluationTime");
_compute_agg_data_timer = ADD_TIMER(profile(), "ComputeAggDataTime");
_compute_partition_by_timer = ADD_TIMER(profile(),
"ComputePartitionByTime");
_compute_order_by_timer = ADD_TIMER(profile(), "ComputeOrderByTime");
+ _compute_range_between_function_timer = ADD_TIMER(profile(),
"ComputeRangeBetweenTime");
+ _partition_search_timer = ADD_TIMER(profile(), "PartitionSearchTime");
+ _order_search_timer = ADD_TIMER(profile(), "OrderSearchTime");
+ _remove_rows_timer = ADD_TIMER(profile(), "RemoveRowsTime");
+ _remove_rows = ADD_COUNTER(profile(), "RemoveRows", TUnit::UNIT);
+ _remove_count = ADD_COUNTER(profile(), "RemoveCount", TUnit::UNIT);
+ _blocks_memory_usage =
+ profile()->AddHighWaterMarkCounter("Blocks", TUnit::BYTES,
"MemoryUsage", 1);
+ _agg_arena_pool = std::make_unique<vectorized::Arena>();
+ auto& p = _parent->cast<AnalyticSinkOperatorX>();
+ if (!p._has_window) { //haven't set window, Unbounded: [unbounded
preceding,unbounded following]
+ // For window frame `ROWS|RANGE BETWEEN UNBOUNDED PRECEDING AND
UNBOUNDED FOLLOWING`
+ _executor.get_next_impl =
&AnalyticSinkLocalState::_get_next_for_partition;
+ } else if (p._has_range_window) {
+ if (!p._has_window_start && !p._has_window_end) {
+ _executor.get_next_impl =
&AnalyticSinkLocalState::_get_next_for_partition;
+ } else {
+ if (!p._has_window_start &&
+ p._window.window_end.type ==
TAnalyticWindowBoundaryType::CURRENT_ROW) {
+ // For window frame `RANGE BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW`
+ _executor.get_next_impl =
&AnalyticSinkLocalState::_get_next_for_unbounded_range;
+ _streaming_mode = config::enable_streaming_analytic;
+ } else {
+ _executor.get_next_impl =
&AnalyticSinkLocalState::_get_next_for_range_between;
+ }
+ }
+ } else {
+ // haven't set start and end, same as PARTITION
+ if (!p._has_window_start && !p._has_window_end) {
+ _executor.get_next_impl =
&AnalyticSinkLocalState::_get_next_for_partition;
+ } else {
+ if (!p._has_window_start &&
+ p._window.window_end.type ==
TAnalyticWindowBoundaryType::CURRENT_ROW) {
+ _executor.get_next_impl =
&AnalyticSinkLocalState::_get_next_for_unbounded_rows;
+ _streaming_mode = true;
+ } else {
+ _executor.get_next_impl =
&AnalyticSinkLocalState::_get_next_for_sliding_rows;
+ }
+ }
+
+ if (p._has_window_start) { //calculate start boundary
+ TAnalyticWindowBoundary b = p._window.window_start;
+ if (b.__isset.rows_offset_value) { //[offset , ]
+ _rows_start_offset = b.rows_offset_value;
+ if (b.type == TAnalyticWindowBoundaryType::PRECEDING) {
+ _rows_start_offset *= -1;
//preceding--> negative
+ }
//current_row 0
+ } else {
//following positive
+ DCHECK_EQ(b.type, TAnalyticWindowBoundaryType::CURRENT_ROW);
//[current row, ]
+ _rows_start_offset = 0;
+ }
+ }
+
+ if (p._has_window_end) { //calculate end boundary
+ TAnalyticWindowBoundary b = p._window.window_end;
+ if (b.__isset.rows_offset_value) { //[ , offset]
+ _rows_end_offset = b.rows_offset_value;
+ if (b.type == TAnalyticWindowBoundaryType::PRECEDING) {
+ _rows_end_offset *= -1;
+ }
+ } else {
+ DCHECK_EQ(b.type, TAnalyticWindowBoundaryType::CURRENT_ROW);
//[ ,current row]
+ _rows_end_offset = 0;
+ }
+ }
+ }
+ profile()->add_info_string("streaming mode: ",
std::to_string(_streaming_mode));
return Status::OK();
}
@@ -43,159 +114,502 @@ Status AnalyticSinkLocalState::open(RuntimeState* state)
{
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_open_timer);
auto& p = _parent->cast<AnalyticSinkOperatorX>();
-
_shared_state->partition_by_column_idxs.resize(p._partition_by_eq_expr_ctxs.size());
-
_shared_state->ordey_by_column_idxs.resize(p._order_by_eq_expr_ctxs.size());
-
- size_t agg_size = p._agg_expr_ctxs.size();
- _agg_expr_ctxs.resize(agg_size);
- _shared_state->agg_input_columns.resize(agg_size);
- for (int i = 0; i < agg_size; ++i) {
- _shared_state->agg_input_columns[i].resize(p._num_agg_input[i]);
+
+ _agg_functions_size = p._agg_functions_size;
+ _agg_expr_ctxs.resize(_agg_functions_size);
+ _agg_functions.resize(_agg_functions_size);
+ _agg_input_columns.resize(_agg_functions_size);
+ _offsets_of_aggregate_states.resize(_agg_functions_size);
+ _result_column_nullable_flags.resize(_agg_functions_size);
+ _result_column_could_resize.resize(_agg_functions_size);
+
+ for (int i = 0; i < _agg_functions_size; ++i) {
+ _agg_functions[i] = p._agg_functions[i]->clone(state,
state->obj_pool());
+ _agg_input_columns[i].resize(p._num_agg_input[i]);
_agg_expr_ctxs[i].resize(p._agg_expr_ctxs[i].size());
for (int j = 0; j < p._agg_expr_ctxs[i].size(); ++j) {
RETURN_IF_ERROR(p._agg_expr_ctxs[i][j]->clone(state,
_agg_expr_ctxs[i][j]));
+ _agg_input_columns[i][j] =
_agg_expr_ctxs[i][j]->root()->data_type()->create_column();
}
-
- for (size_t j = 0; j < _agg_expr_ctxs[i].size(); ++j) {
- _shared_state->agg_input_columns[i][j] =
- _agg_expr_ctxs[i][j]->root()->data_type()->create_column();
+ _offsets_of_aggregate_states[i] = p._offsets_of_aggregate_states[i];
+ _result_column_nullable_flags[i] =
+
!_agg_functions[i]->function()->get_return_type()->is_nullable() &&
+ _agg_functions[i]->data_type()->is_nullable();
+ _result_column_could_resize[i] =
+ _agg_functions[i]->function()->result_column_could_resize();
+ if
(PARTITION_FUNCTION_SET.contains(_agg_functions[i]->function()->get_name())) {
+ _streaming_mode = false;
}
}
- _partition_by_eq_expr_ctxs.resize(p._partition_by_eq_expr_ctxs.size());
- for (size_t i = 0; i < _partition_by_eq_expr_ctxs.size(); i++) {
+
+ _partition_exprs_size = p._partition_by_eq_expr_ctxs.size();
+ _partition_by_eq_expr_ctxs.resize(_partition_exprs_size);
+ _partition_by_columns.resize(_partition_exprs_size);
+ for (size_t i = 0; i < _partition_exprs_size; i++) {
RETURN_IF_ERROR(
p._partition_by_eq_expr_ctxs[i]->clone(state,
_partition_by_eq_expr_ctxs[i]));
+ _partition_by_columns[i] =
+
_partition_by_eq_expr_ctxs[i]->root()->data_type()->create_column();
}
- _order_by_eq_expr_ctxs.resize(p._order_by_eq_expr_ctxs.size());
- for (size_t i = 0; i < _order_by_eq_expr_ctxs.size(); i++) {
+
+ _order_by_exprs_size = p._order_by_eq_expr_ctxs.size();
+ _order_by_eq_expr_ctxs.resize(_order_by_exprs_size);
+ _order_by_columns.resize(_order_by_exprs_size);
+ for (size_t i = 0; i < _order_by_exprs_size; i++) {
RETURN_IF_ERROR(p._order_by_eq_expr_ctxs[i]->clone(state,
_order_by_eq_expr_ctxs[i]));
+ _order_by_columns[i] =
_order_by_eq_expr_ctxs[i]->root()->data_type()->create_column();
}
+
+ // only support one order by column, so need two columns upper and lower
bound
+ _range_result_columns.resize(_range_between_expr_ctxs.size());
+ _range_between_expr_ctxs = p._range_between_expr_ctxs;
+ for (size_t i = 0; i < _range_between_expr_ctxs.size(); i++) {
+ RETURN_IF_ERROR(p._range_between_expr_ctxs[i]->clone(state,
_range_between_expr_ctxs[i]));
+ _range_result_columns[i] =
+
_range_between_expr_ctxs[i]->root()->data_type()->create_column();
+ }
+
+ _fn_place_ptr =
_agg_arena_pool->aligned_alloc(p._total_size_of_aggregate_states,
+ p._align_aggregate_states);
+ _create_agg_status();
return Status::OK();
}
-bool AnalyticSinkLocalState::_whether_need_next_partition(BlockRowPos&
found_partition_end) {
- auto& shared_state = *_shared_state;
- if (shared_state.input_eos ||
- (shared_state.current_row_position <
- shared_state.partition_by_end.pos)) { //now still have partition data
- return false;
+Status AnalyticSinkLocalState::close(RuntimeState* state, Status exec_status) {
+ SCOPED_TIMER(exec_time_counter());
+ SCOPED_TIMER(_close_timer);
+ if (_closed) {
+ return Status::OK();
+ }
+
+ _destroy_agg_status();
+ _agg_arena_pool = nullptr;
+ _fn_place_ptr = nullptr;
+ _result_window_columns.clear();
+ _agg_input_columns.clear();
+ _partition_by_columns.clear();
+ _order_by_columns.clear();
+ _range_result_columns.clear();
+ return PipelineXSinkLocalState<AnalyticSharedState>::close(state,
exec_status);
+}
+
+bool AnalyticSinkLocalState::_get_next_for_sliding_rows(int64_t batch_rows,
+ int64_t
current_block_base_pos) {
+ while (_current_row_position < _partition_by_pose.end) {
+ int64_t current_row_start = 0;
+ int64_t current_row_end = _current_row_position + _rows_end_offset + 1;
+
+ _reset_agg_status();
+ if
(!_parent->cast<AnalyticSinkOperatorX>()._window.__isset.window_start) {
+ current_row_start = _partition_by_pose.start;
+ } else {
+ current_row_start = _current_row_position + _rows_start_offset;
+ }
+ // Eg: rows between unbounded preceding and 10 preceding
+ // Make sure range_start <= range_end
+ current_row_start = std::min(current_row_start, current_row_end);
+ _execute_for_function(_partition_by_pose.start,
_partition_by_pose.end, current_row_start,
+ current_row_end);
+ int64_t pos = current_pos_in_block();
+ _insert_result_info(pos, pos + 1);
+ _current_row_position++;
+ if (_current_row_position - current_block_base_pos >= batch_rows) {
+ return true;
+ }
}
- if ((_partition_by_eq_expr_ctxs.empty() && !shared_state.input_eos) ||
- (found_partition_end.pos == 0)) { //no partition, get until fetch to
EOS
- return true;
+ return false;
+}
+
+bool AnalyticSinkLocalState::_get_next_for_unbounded_rows(int64_t batch_rows,
+ int64_t
current_block_base_pos) {
+ while (_current_row_position < _partition_by_pose.end) {
+ // [preceding, current_row], [current_row, following] rewrite it's same
+ // as could reuse the previous calculate result, so don't call
_reset_agg_status function
+ // going on calculate, add up data, no need to reset state
+ _execute_for_function(_partition_by_pose.start, _partition_by_pose.end,
+ _current_row_position, _current_row_position +
1);
+ int64_t pos = current_pos_in_block();
+ _insert_result_info(pos, pos + 1);
+ _current_row_position++;
+ if (_current_row_position - current_block_base_pos >= batch_rows) {
+ return true;
+ }
+ }
+ return false;
+}
+
+bool AnalyticSinkLocalState::_get_next_for_partition(int64_t batch_rows,
+ int64_t
current_block_base_pos) {
+ if (_current_row_position == _partition_by_pose.start) {
+ _execute_for_function(_partition_by_pose.start, _partition_by_pose.end,
+ _partition_by_pose.start,
_partition_by_pose.end);
+ }
+
+ // the end pos maybe after multis blocks, but should output by batch size
and should not exceed partition end
+ auto window_end_pos = _current_row_position + batch_rows;
+ window_end_pos = std::min<int64_t>(window_end_pos, _partition_by_pose.end);
+
+ auto previous_window_frame_width = _current_row_position -
current_block_base_pos;
+ auto current_window_frame_width = window_end_pos - current_block_base_pos;
+ // should not exceed block batch size
+ current_window_frame_width = std::min<int64_t>(current_window_frame_width,
batch_rows);
+ auto real_deal_with_width = current_window_frame_width -
previous_window_frame_width;
+ int64_t pos = current_pos_in_block();
+ _insert_result_info(pos, pos + real_deal_with_width);
+ _current_row_position += real_deal_with_width;
+ return _current_row_position - current_block_base_pos >= batch_rows;
+}
+
+bool AnalyticSinkLocalState::_get_next_for_unbounded_range(int64_t batch_rows,
+ int64_t
current_block_base_pos) {
+ while (_current_row_position < _partition_by_pose.end) {
+ _update_order_by_range();
+ if (_current_row_position == _order_by_pose.start) {
+ _execute_for_function(_partition_by_pose.start,
_partition_by_pose.end,
+ _order_by_pose.start, _order_by_pose.end);
+ }
+ auto previous_window_frame_width = _current_row_position -
current_block_base_pos;
+ auto current_window_frame_width = _order_by_pose.end -
current_block_base_pos;
+ current_window_frame_width =
std::min<int64_t>(current_window_frame_width, batch_rows);
+ auto real_deal_with_width = current_window_frame_width -
previous_window_frame_width;
+ int64_t pos = current_pos_in_block();
+ _insert_result_info(pos, pos + real_deal_with_width);
+ _current_row_position += real_deal_with_width;
+ if (_current_row_position - current_block_base_pos >= batch_rows) {
+ return true;
+ }
+ }
+ return false;
+}
+
+bool AnalyticSinkLocalState::_get_next_for_range_between(int64_t batch_rows,
+ int64_t
current_block_base_pos) {
+ while (_current_row_position < _partition_by_pose.end) {
+ _reset_agg_status();
+ if
(!_parent->cast<AnalyticSinkOperatorX>()._window.__isset.window_start) {
+ _order_by_pose.start = _partition_by_pose.start;
+ } else {
+ _order_by_pose.start = find_first_not_equal(
+ _range_result_columns[0].get(), _order_by_columns[0].get(),
+ _current_row_position, _order_by_pose.start,
_partition_by_pose.end);
+ }
+
+ if
(!_parent->cast<AnalyticSinkOperatorX>()._window.__isset.window_end) {
+ _order_by_pose.end = _partition_by_pose.end;
+ } else {
+ _order_by_pose.end = find_first_not_equal(
+ _range_result_columns[1].get(), _order_by_columns[0].get(),
+ _current_row_position, _order_by_pose.end,
_partition_by_pose.end);
+ }
+ _execute_for_function(_partition_by_pose.start, _partition_by_pose.end,
+ _order_by_pose.start, _order_by_pose.end);
+ int64_t pos = current_pos_in_block();
+ _insert_result_info(pos, pos + 1);
+ _current_row_position++;
+ if (_current_row_position - current_block_base_pos >= batch_rows) {
+ return true;
+ }
}
- if (!_partition_by_eq_expr_ctxs.empty() &&
- found_partition_end.pos == shared_state.all_block_end.pos &&
- !shared_state.input_eos) { //current partition data calculate done
- return true;
+ if (_current_row_position == _partition_by_pose.end) {
+ _order_by_pose.start = _partition_by_pose.end; // update to next
partition pos
+ _order_by_pose.end = _partition_by_pose.end;
}
return false;
}
-//_partition_by_columns,_order_by_columns save in blocks, so if need to
calculate the boundary, may find in which blocks firstly
-BlockRowPos AnalyticSinkLocalState::_compare_row_to_find_end(int64_t idx,
BlockRowPos start,
- BlockRowPos end,
- bool
need_check_first) {
- auto& shared_state = *_shared_state;
- int64_t start_init_row_num = start.row_num;
- vectorized::ColumnPtr start_column =
-
shared_state.input_blocks[start.block_num].get_by_position(idx).column;
- vectorized::ColumnPtr start_next_block_column = start_column;
-
- DCHECK_LE(start.block_num, end.block_num);
- DCHECK_LE(start.block_num, shared_state.input_blocks.size() - 1);
- int64_t start_block_num = start.block_num;
- int64_t end_block_num = end.block_num;
- int64_t mid_blcok_num = end.block_num;
- // To fix this problem: https://github.com/apache/doris/issues/15951
- // in this case, the partition by column is last row of block, so it's
pointed to a new block at row = 0, range is: [left, right)
- // From the perspective of order by column, the two values are exactly
equal.
- // so the range will be get wrong because it's compare_at == 0 with next
block at row = 0
- if (need_check_first && end.block_num > 0 && end.row_num == 0) {
- end.block_num--;
- end_block_num--;
- end.row_num = shared_state.input_blocks[end_block_num].rows();
- }
- //binary search find in which block
- while (start_block_num < end_block_num) {
- mid_blcok_num = (start_block_num + end_block_num + 1) >> 1;
- start_next_block_column =
-
shared_state.input_blocks[mid_blcok_num].get_by_position(idx).column;
- //Compares (*this)[n] and rhs[m], this: start[init_row] rhs: mid[0]
- if (start_column->compare_at(start_init_row_num, 0,
*start_next_block_column, 1) == 0) {
- start_block_num = mid_blcok_num;
+Status AnalyticSinkLocalState::_execute_impl() {
+ while (_output_block_index < _input_blocks.size()) {
+ {
+ _get_partition_by_end();
+ // streaming_mode means no need get all parition data, could
calculate data when it's arrived
+ if (!_partition_by_pose.is_ended && !_streaming_mode) {
+ break;
+ }
+ _init_result_columns();
+ auto batch_rows = _input_blocks[_output_block_index].rows();
+ auto current_block_base_pos =
+ _input_block_first_row_positions[_output_block_index] -
_have_removed_rows;
+ bool should_output = false;
+
+ {
+ SCOPED_TIMER(_evaluation_timer);
+ should_output =
+ (this->*_executor.get_next_impl)(batch_rows,
current_block_base_pos);
+ }
+
+ if (should_output) {
+ vectorized::Block block;
+ _output_current_block(&block);
+ _refresh_buffer_and_dependency_state(&block);
+ }
+ if (_current_row_position == _partition_by_pose.end &&
_partition_by_pose.is_ended) {
+ _reset_state_for_next_partition();
+ }
+ }
+ }
+ return Status::OK();
+}
+
+void AnalyticSinkLocalState::_execute_for_function(int64_t partition_start,
int64_t partition_end,
+ int64_t frame_start,
int64_t frame_end) {
+ // here is the core function, should not add timer
+ // If the end is not greater than the start, the current window should be
empty.
+ _current_window_empty =
+ std::min(frame_end, partition_end) <= std::max(frame_start,
partition_start);
+
+ for (size_t i = 0; i < _agg_functions_size; ++i) {
+ if (_result_column_nullable_flags[i] && _current_window_empty) {
+ continue;
+ }
+ std::vector<const vectorized::IColumn*> agg_columns;
+ for (int j = 0; j < _agg_input_columns[i].size(); ++j) {
+ agg_columns.push_back(_agg_input_columns[i][j].get());
+ }
+ _agg_functions[i]->function()->add_range_single_place(
+ partition_start, partition_end, frame_start, frame_end,
+ _fn_place_ptr + _offsets_of_aggregate_states[i],
agg_columns.data(),
+ _agg_arena_pool.get());
+ }
+}
+
+void AnalyticSinkLocalState::_insert_result_info(int64_t start, int64_t end) {
+ // here is the core function, should not add timer
+ for (size_t i = 0; i < _agg_functions_size; ++i) {
+ if (_result_column_nullable_flags[i]) {
+ if (_current_window_empty) {
+ //TODO need check this logical???
+ _result_window_columns[i]->insert_many_defaults(end - start);
+ } else {
+ auto* dst =
+
assert_cast<vectorized::ColumnNullable*>(_result_window_columns[i].get());
+ dst->get_null_map_data().add_num_element(0,
static_cast<uint32_t>(end - start));
+ _agg_functions[i]->function()->insert_result_into_range(
+ _fn_place_ptr + _offsets_of_aggregate_states[i],
dst->get_nested_column(),
+ start, end);
+ }
} else {
- end_block_num = mid_blcok_num - 1;
- }
- }
-
- // have check the start.block_num: start_column[start_init_row_num] with
mid_blcok_num start_next_block_column[0]
- // now next block must not be result, so need check with end_block_num:
start_next_block_column[last_row]
- if (end_block_num == mid_blcok_num - 1) {
- start_next_block_column =
-
shared_state.input_blocks[end_block_num].get_by_position(idx).column;
- int64_t block_size = shared_state.input_blocks[end_block_num].rows();
- if ((start_column->compare_at(start_init_row_num, block_size - 1,
*start_next_block_column,
- 1) == 0)) {
- start.block_num = end_block_num + 1;
- start.row_num = 0;
- return start;
- }
- }
-
- //check whether need get column again, maybe same as first init
- // if the start_block_num have move to forword, so need update start block
num and compare it from row_num=0
- if (start_block_num != start.block_num) {
- start_init_row_num = 0;
- start.block_num = start_block_num;
- start_column =
shared_state.input_blocks[start.block_num].get_by_position(idx).column;
- }
- //binary search, set start and end pos
- int64_t start_pos = start_init_row_num;
- int64_t end_pos = shared_state.input_blocks[start.block_num].rows();
- //if end_block_num haven't moved, only start_block_num go to the end block
- //so could use the end.row_num for binary search
- if (start.block_num == end.block_num) {
- end_pos = end.row_num;
- }
- while (start_pos < end_pos) {
- int64_t mid_pos = (start_pos + end_pos) >> 1;
- if (start_column->compare_at(start_init_row_num, mid_pos,
*start_column, 1)) {
- end_pos = mid_pos;
+ _agg_functions[i]->function()->insert_result_into_range(
+ _fn_place_ptr + _offsets_of_aggregate_states[i],
*_result_window_columns[i],
+ start, end);
+ }
+ }
+}
+
+void AnalyticSinkLocalState::_output_current_block(vectorized::Block* block) {
+ block->swap(std::move(_input_blocks[_output_block_index]));
+ _blocks_memory_usage->add(-block->allocated_bytes());
+
DCHECK(_parent->cast<AnalyticSinkOperatorX>()._change_to_nullable_flags.size()
==
+ _result_window_columns.size());
+ for (size_t i = 0; i < _result_window_columns.size(); ++i) {
+ DCHECK(_result_window_columns[i]);
+ DCHECK(_agg_functions[i]);
+ if
(_parent->cast<AnalyticSinkOperatorX>()._change_to_nullable_flags[i]) {
+ block->insert({make_nullable(std::move(_result_window_columns[i])),
+ make_nullable(_agg_functions[i]->data_type()), ""});
} else {
- start_pos = mid_pos + 1;
+ block->insert(
+ {std::move(_result_window_columns[i]),
_agg_functions[i]->data_type(), ""});
+ }
+ }
+
+ _output_block_index++;
+}
+
+void AnalyticSinkLocalState::_init_result_columns() {
+ if (_current_row_position + _have_removed_rows ==
+ _input_block_first_row_positions[_output_block_index]) {
+ _result_window_columns.resize(_agg_functions_size);
+ // return type create result column
+ for (size_t i = 0; i < _agg_functions_size; ++i) {
+ _result_window_columns[i] =
_agg_functions[i]->data_type()->create_column();
+ if (_result_column_could_resize[i]) {
+
_result_window_columns[i]->resize(_input_blocks[_output_block_index].rows());
+ } else {
+
_result_window_columns[i]->reserve(_input_blocks[_output_block_index].rows());
+ }
+ }
+ }
+}
+
+void
AnalyticSinkLocalState::_refresh_buffer_and_dependency_state(vectorized::Block*
block) {
+ size_t buffer_size = 0;
+ {
+ std::unique_lock<std::mutex> lc(_shared_state->buffer_mutex);
+ _shared_state->blocks_buffer.push(std::move(*block));
+ buffer_size = _shared_state->blocks_buffer.size();
+ }
+ if (buffer_size > 128) {
+ // buffer have enough data, could block the sink
+ _dependency->block();
+ }
+ // buffer have push data, could signal the source to read
+ _dependency->set_ready_to_read();
+}
+
+void AnalyticSinkLocalState::_reset_state_for_next_partition() {
+ _partition_column_statistics.update(_partition_by_pose.end -
_partition_by_pose.start);
+ _order_by_column_statistics.reset();
+ _partition_by_pose.start = _partition_by_pose.end;
+ _current_row_position = _partition_by_pose.start;
+ _reset_agg_status();
+}
+
+void AnalyticSinkLocalState::_update_order_by_range() {
+ // still have more data
+ if (_order_by_pose.is_ended && _current_row_position < _order_by_pose.end)
{
+ return;
+ }
+ SCOPED_TIMER(_order_search_timer);
+ while (!_next_order_by_ends.empty()) {
+ int64_t peek = _next_order_by_ends.front();
+ _next_order_by_ends.pop();
+ if (peek > _order_by_pose.end) {
+ _order_by_pose.start = _order_by_pose.end;
+ _order_by_pose.end = peek;
+ _order_by_pose.is_ended = true;
+ _order_by_column_statistics.update(_order_by_pose.end -
_order_by_pose.start);
+ return;
+ }
+ }
+
+ if (_order_by_pose.is_ended) {
+ _order_by_pose.start = _order_by_pose.end;
+ }
+ _order_by_pose.end = _partition_by_pose.end;
+
+ {
+ if (_order_by_pose.start < _order_by_pose.end) {
+ for (size_t i = 0; i < _order_by_exprs_size; ++i) {
+ _order_by_pose.end = find_first_not_equal(
+ _order_by_columns[i].get(), _order_by_columns[i].get(),
+ _order_by_pose.start, _order_by_pose.start,
_order_by_pose.end);
+ }
+ }
+ }
+
+ if (_order_by_pose.end < _partition_by_pose.end) {
+ _order_by_column_statistics.update(_order_by_pose.end -
_order_by_pose.start);
+ _order_by_pose.is_ended = true;
+ _find_next_order_by_ends();
+ return;
+ }
+ DCHECK_EQ(_partition_by_pose.end, _order_by_pose.end);
+ if (_partition_by_pose.is_ended) {
+ _order_by_pose.is_ended = true;
+ return;
+ }
+ _order_by_pose.is_ended = false;
+}
+
+void AnalyticSinkLocalState::_get_partition_by_end() {
+ //still have data, return partition_by_end directly
+ if (_partition_by_pose.is_ended && _current_row_position <
_partition_by_pose.end) {
+ return;
+ }
+ //no partition_by, the all block is end
+ if (_partition_by_eq_expr_ctxs.empty() || (_input_total_rows == 0)) {
+ _partition_by_pose.end = _input_total_rows - _have_removed_rows;
+ _partition_by_pose.is_ended = _input_eos;
+ return;
+ }
+ SCOPED_TIMER(_partition_search_timer);
+ while (!_next_partition_ends.empty()) {
+ int64_t peek = _next_partition_ends.front();
+ _next_partition_ends.pop();
+ if (peek > _partition_by_pose.end) {
+ _partition_by_pose.end = peek;
+ _partition_by_pose.is_ended = true;
+ return;
+ }
+ }
+
+ const auto start = _partition_by_pose.end;
+ const auto target = (_partition_by_pose.is_ended || _partition_by_pose.end
== 0)
+ ? _partition_by_pose.end
+ : _partition_by_pose.end - 1;
+ DCHECK(_partition_exprs_size > 0);
+ const auto partition_column_rows = _partition_by_columns[0]->size();
+ _partition_by_pose.end = partition_column_rows;
+
+ {
+ if (start < _partition_by_pose.end) {
+ for (size_t i = 0; i < _partition_exprs_size; ++i) {
+ _partition_by_pose.end = find_first_not_equal(
+ _partition_by_columns[i].get(),
_partition_by_columns[i].get(), target,
+ start, _partition_by_pose.end);
+ }
+ }
+ }
+
+ if (_partition_by_pose.end < partition_column_rows) {
+ _partition_by_pose.is_ended = true;
+ _find_next_partition_ends();
+ return;
+ }
+
+ DCHECK_EQ(_partition_by_pose.end, partition_column_rows);
+ _partition_by_pose.is_ended = _input_eos;
+}
+
+void AnalyticSinkLocalState::_find_next_partition_ends() {
+ if (!_partition_column_statistics.is_high_cardinality()) {
+ return;
+ }
+
+ SCOPED_TIMER(_partition_search_timer);
+ for (size_t i = _partition_by_pose.end + 1; i <
_partition_by_columns[0]->size(); ++i) {
+ for (auto& column : _partition_by_columns) {
+ auto cmp = column->compare_at(i - 1, i, *column, 1);
+ if (cmp != 0) {
+ _next_partition_ends.push(i);
+ break;
+ }
}
}
- start.row_num = start_pos; //update row num, return the find end
- return start;
}
-BlockRowPos AnalyticSinkLocalState::_get_partition_by_end() {
- auto& shared_state = *_shared_state;
- if (shared_state.current_row_position <
- shared_state.partition_by_end.pos) { //still have data, return
partition_by_end directly
- return shared_state.partition_by_end;
+void AnalyticSinkLocalState::_find_next_order_by_ends() {
+ if (!_order_by_column_statistics.is_high_cardinality()) {
+ return;
}
- if (_partition_by_eq_expr_ctxs.empty() ||
- (shared_state.input_total_rows == 0)) { //no partition_by, the all
block is end
- return shared_state.all_block_end;
+ SCOPED_TIMER(_order_search_timer);
+ for (size_t i = _order_by_pose.end + 1; i < _partition_by_pose.end; ++i) {
+ for (auto& column : _order_by_columns) {
+ auto cmp = column->compare_at(i - 1, i, *column, 1);
+ if (cmp != 0) {
+ _next_order_by_ends.push(i);
+ break;
+ }
+ }
}
+}
- BlockRowPos cal_end = shared_state.all_block_end;
- for (size_t i = 0; i < _partition_by_eq_expr_ctxs.size();
- ++i) { //have partition_by, binary search the partiton end
- cal_end =
_compare_row_to_find_end(shared_state.partition_by_column_idxs[i],
- shared_state.partition_by_end,
cal_end);
+// Compares (*this)[n] and rhs[m]
+int64_t AnalyticSinkLocalState::find_first_not_equal(vectorized::IColumn*
reference_column,
+ vectorized::IColumn*
compared_column,
+ int64_t target, int64_t
start, int64_t end) {
+ while (start + 1 < end) {
+ int64_t mid = start + (end - start) / 2;
+ if (reference_column->compare_at(target, mid, *compared_column, 1) ==
0) {
+ start = mid;
+ } else {
+ end = mid;
+ }
+ }
+ if (reference_column->compare_at(target, end - 1, *compared_column, 1) ==
0) {
+ return end;
}
- cal_end.pos =
shared_state.input_block_first_row_positions[cal_end.block_num] +
cal_end.row_num;
- return cal_end;
+ return end - 1;
}
AnalyticSinkOperatorX::AnalyticSinkOperatorX(ObjectPool* pool, int operator_id,
const TPlanNode& tnode, const
DescriptorTbl& descs,
bool require_bucket_distribution)
: DataSinkOperatorX(operator_id, tnode.node_id),
+ _pool(pool),
+ _intermediate_tuple_id(tnode.analytic_node.intermediate_tuple_id),
+ _output_tuple_id(tnode.analytic_node.output_tuple_id),
_buffered_tuple_id(tnode.analytic_node.__isset.buffered_tuple_id
? tnode.analytic_node.buffered_tuple_id
: 0),
@@ -203,20 +617,33 @@ AnalyticSinkOperatorX::AnalyticSinkOperatorX(ObjectPool*
pool, int operator_id,
_require_bucket_distribution(require_bucket_distribution),
_partition_exprs(tnode.__isset.distribute_expr_lists &&
require_bucket_distribution
? tnode.distribute_expr_lists[0]
- : tnode.analytic_node.partition_exprs) {
+ : tnode.analytic_node.partition_exprs),
+ _window(tnode.analytic_node.window),
+ _has_window(tnode.analytic_node.__isset.window),
+ _has_range_window(tnode.analytic_node.window.type ==
TAnalyticWindowType::RANGE),
+ _has_window_start(tnode.analytic_node.window.__isset.window_start),
+ _has_window_end(tnode.analytic_node.window.__isset.window_end) {
_is_serial_operator = tnode.__isset.is_serial_operator &&
tnode.is_serial_operator;
}
Status AnalyticSinkOperatorX::init(const TPlanNode& tnode, RuntimeState*
state) {
RETURN_IF_ERROR(DataSinkOperatorX::init(tnode, state));
const TAnalyticNode& analytic_node = tnode.analytic_node;
- size_t agg_size = analytic_node.analytic_functions.size();
- _agg_expr_ctxs.resize(agg_size);
- _num_agg_input.resize(agg_size);
- for (int i = 0; i < agg_size; ++i) {
+ _agg_functions_size = analytic_node.analytic_functions.size();
+ _agg_expr_ctxs.resize(_agg_functions_size);
+ _num_agg_input.resize(_agg_functions_size);
+ for (int i = 0; i < _agg_functions_size; ++i) {
const TExpr& desc = analytic_node.analytic_functions[i];
- _num_agg_input[i] = desc.nodes[0].num_children;
+ vectorized::AggFnEvaluator* evaluator = nullptr;
+ // Window function treats all NullableAggregateFunction as
AlwaysNullable.
+ // Its behavior is same with executed without group by key.
+ // https://github.com/apache/doris/pull/40693
+ RETURN_IF_ERROR(vectorized::AggFnEvaluator::create(_pool, desc, {},
/*without_key*/ true,
+ &evaluator));
+ _agg_functions.emplace_back(evaluator);
+
int node_idx = 0;
+ _num_agg_input[i] = desc.nodes[0].num_children;
for (int j = 0; j < desc.nodes[0].num_children; ++j) {
++node_idx;
vectorized::VExprSPtr expr;
@@ -231,7 +658,8 @@ Status AnalyticSinkOperatorX::init(const TPlanNode& tnode,
RuntimeState* state)
_partition_by_eq_expr_ctxs));
RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(analytic_node.order_by_exprs,
_order_by_eq_expr_ctxs));
- _agg_functions_size = agg_size;
+
RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(analytic_node.range_between_offset_exprs,
+
_range_between_expr_ctxs));
return Status::OK();
}
@@ -240,6 +668,18 @@ Status AnalyticSinkOperatorX::open(RuntimeState* state) {
for (const auto& ctx : _agg_expr_ctxs) {
RETURN_IF_ERROR(vectorized::VExpr::prepare(ctx, state,
_child->row_desc()));
}
+ _intermediate_tuple_desc =
state->desc_tbl().get_tuple_descriptor(_intermediate_tuple_id);
+ _output_tuple_desc =
state->desc_tbl().get_tuple_descriptor(_output_tuple_id);
+ _change_to_nullable_flags.resize(_agg_functions_size);
+ for (size_t i = 0; i < _agg_functions_size; ++i) {
+ SlotDescriptor* intermediate_slot_desc =
_intermediate_tuple_desc->slots()[i];
+ SlotDescriptor* output_slot_desc = _output_tuple_desc->slots()[i];
+ RETURN_IF_ERROR(_agg_functions[i]->prepare(state, _child->row_desc(),
+ intermediate_slot_desc,
output_slot_desc));
+ _agg_functions[i]->set_version(state->be_exec_version());
+ _change_to_nullable_flags[i] =
+ output_slot_desc->is_nullable() &&
(!_agg_functions[i]->data_type()->is_nullable());
+ }
if (!_partition_by_eq_expr_ctxs.empty() ||
!_order_by_eq_expr_ctxs.empty()) {
vector<TTupleId> tuple_ids;
tuple_ids.push_back(_child->row_desc().tuple_descriptors()[0]->id());
@@ -254,11 +694,39 @@ Status AnalyticSinkOperatorX::open(RuntimeState* state) {
vectorized::VExpr::prepare(_order_by_eq_expr_ctxs, state,
cmp_row_desc));
}
}
+ if (!_range_between_expr_ctxs.empty()) {
+ DCHECK(_range_between_expr_ctxs.size() == 2);
+ RETURN_IF_ERROR(
+ vectorized::VExpr::prepare(_range_between_expr_ctxs, state,
_child->row_desc()));
+ }
+ RETURN_IF_ERROR(vectorized::VExpr::open(_range_between_expr_ctxs, state));
RETURN_IF_ERROR(vectorized::VExpr::open(_partition_by_eq_expr_ctxs,
state));
RETURN_IF_ERROR(vectorized::VExpr::open(_order_by_eq_expr_ctxs, state));
for (size_t i = 0; i < _agg_functions_size; ++i) {
+ RETURN_IF_ERROR(_agg_functions[i]->open(state));
RETURN_IF_ERROR(vectorized::VExpr::open(_agg_expr_ctxs[i], state));
}
+
+ _offsets_of_aggregate_states.resize(_agg_functions_size);
+ for (size_t i = 0; i < _agg_functions_size; ++i) {
+ _offsets_of_aggregate_states[i] = _total_size_of_aggregate_states;
+ const auto& agg_function = _agg_functions[i]->function();
+ // aggregate states are aligned based on maximum requirement
+ _align_aggregate_states = std::max(_align_aggregate_states,
agg_function->align_of_data());
+ _total_size_of_aggregate_states += agg_function->size_of_data();
+ // If not the last aggregate_state, we need pad it so that next
aggregate_state will be aligned.
+ if (i + 1 < _agg_functions_size) {
+ size_t alignment_of_next_state = _agg_functions[i +
1]->function()->align_of_data();
+ if ((alignment_of_next_state & (alignment_of_next_state - 1)) !=
0) {
+ return Status::RuntimeError("Logical error: align_of_data is
not 2^N");
+ }
+ /// Extend total_size to next alignment requirement
+ /// Add padding by rounding up 'total_size_of_aggregate_states' to
be a multiplier of alignment_of_next_state.
+ _total_size_of_aggregate_states =
+ (_total_size_of_aggregate_states + alignment_of_next_state
- 1) /
+ alignment_of_next_state * alignment_of_next_state;
+ }
+ }
return Status::OK();
}
@@ -267,80 +735,139 @@ Status AnalyticSinkOperatorX::sink(doris::RuntimeState*
state, vectorized::Block
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(),
(int64_t)input_block->rows());
-
local_state._reserve_mem_size = 0;
SCOPED_PEAK_MEM(&local_state._reserve_mem_size);
+ local_state._input_eos = eos;
+ local_state._remove_unused_rows();
+ RETURN_IF_ERROR(_add_input_block(state, input_block));
+ RETURN_IF_ERROR(local_state._execute_impl());
+ if (local_state._input_eos) {
+ std::unique_lock<std::mutex>
lc(local_state._shared_state->sink_eos_lock);
+ local_state._shared_state->sink_eos = true;
+ local_state._dependency->set_ready_to_read(); // ready for source to
read
+ }
+ return Status::OK();
+}
- local_state._shared_state->input_eos = eos;
- if (local_state._shared_state->input_eos && input_block->rows() == 0) {
- local_state._dependency->set_ready_to_read();
- local_state._dependency->block();
+size_t AnalyticSinkOperatorX::get_reserve_mem_size(RuntimeState* state, bool
eos) {
+ auto& local_state = get_local_state(state);
+ return local_state._reserve_mem_size;
+}
+
+Status AnalyticSinkOperatorX::_add_input_block(doris::RuntimeState* state,
+ vectorized::Block* input_block)
{
+ if (input_block->rows() <= 0) {
return Status::OK();
}
-
- local_state._shared_state->input_block_first_row_positions.emplace_back(
- local_state._shared_state->input_total_rows);
+ auto& local_state = get_local_state(state);
+
local_state._input_block_first_row_positions.emplace_back(local_state._input_total_rows);
size_t block_rows = input_block->rows();
- local_state._shared_state->input_total_rows += block_rows;
- local_state._shared_state->all_block_end.block_num =
- local_state._shared_state->input_blocks.size();
- local_state._shared_state->all_block_end.row_num = block_rows;
- local_state._shared_state->all_block_end.pos =
local_state._shared_state->input_total_rows;
+ local_state._input_total_rows += block_rows;
// record origin columns, maybe be after this, could cast some column but
no need to output
auto column_to_keep = input_block->columns();
-
{
SCOPED_TIMER(local_state._compute_agg_data_timer);
- for (size_t i = 0; i < _agg_functions_size;
- ++i) { //insert _agg_input_columns, execute calculate for its
+ //insert _agg_input_columns, execute calculate for its, and those
columns maybe could remove have used data
+ for (size_t i = 0; i < _agg_functions_size; ++i) {
for (size_t j = 0; j < local_state._agg_expr_ctxs[i].size(); ++j) {
- RETURN_IF_ERROR(_insert_range_column(
- input_block, local_state._agg_expr_ctxs[i][j],
-
local_state._shared_state->agg_input_columns[i][j].get(), block_rows));
+ RETURN_IF_ERROR(_insert_range_column(input_block,
local_state._agg_expr_ctxs[i][j],
+
local_state._agg_input_columns[i][j].get(),
+ block_rows));
}
}
}
{
SCOPED_TIMER(local_state._compute_partition_by_timer);
for (size_t i = 0; i < local_state._partition_by_eq_expr_ctxs.size();
++i) {
- int result_col_id = -1;
-
RETURN_IF_ERROR(local_state._partition_by_eq_expr_ctxs[i]->execute(input_block,
-
&result_col_id));
- DCHECK_GE(result_col_id, 0);
- local_state._shared_state->partition_by_column_idxs[i] =
result_col_id;
+ RETURN_IF_ERROR(
+ _insert_range_column(input_block,
local_state._partition_by_eq_expr_ctxs[i],
+
local_state._partition_by_columns[i].get(), block_rows));
}
}
-
{
SCOPED_TIMER(local_state._compute_order_by_timer);
for (size_t i = 0; i < local_state._order_by_eq_expr_ctxs.size(); ++i)
{
- int result_col_id = -1;
+ RETURN_IF_ERROR(_insert_range_column(input_block,
local_state._order_by_eq_expr_ctxs[i],
+
local_state._order_by_columns[i].get(),
+ block_rows));
+ }
+ }
+ {
+ SCOPED_TIMER(local_state._compute_range_between_function_timer);
+ for (size_t i = 0; i < local_state._range_between_expr_ctxs.size();
++i) {
RETURN_IF_ERROR(
-
local_state._order_by_eq_expr_ctxs[i]->execute(input_block, &result_col_id));
- DCHECK_GE(result_col_id, 0);
- local_state._shared_state->ordey_by_column_idxs[i] = result_col_id;
+ _insert_range_column(input_block,
local_state._range_between_expr_ctxs[i],
+
local_state._range_result_columns[i].get(), block_rows));
}
}
-
vectorized::Block::erase_useless_column(input_block, column_to_keep);
-
COUNTER_UPDATE(local_state._memory_used_counter,
input_block->allocated_bytes());
+ COUNTER_UPDATE(local_state._blocks_memory_usage,
input_block->allocated_bytes());
+ local_state._input_blocks.emplace_back(std::move(*input_block));
+ return Status::OK();
+}
+
+void AnalyticSinkLocalState::_remove_unused_rows() {
+ const size_t block_num = 256;
+ if (_removed_block_index + block_num + 1 >=
_input_block_first_row_positions.size()) {
+ return;
+ }
+ const int64_t unused_rows_pos =
+ _input_block_first_row_positions[_removed_block_index + block_num];
+
+ if (_streaming_mode) {
+ auto idx = _output_block_index - 1;
+ if (idx < 0 || _input_block_first_row_positions[idx] <=
unused_rows_pos) {
+ return;
+ }
+ } else {
+ if (_have_removed_rows + _partition_by_pose.start <= unused_rows_pos) {
+ return;
+ }
+ }
- //TODO: if need improvement, the is a tips to maintain a free queue,
- //so the memory could reuse, no need to new/delete again;
-
local_state._shared_state->input_blocks.emplace_back(std::move(*input_block));
+ const int64_t remove_rows = unused_rows_pos - _have_removed_rows;
+ auto left_rows = _input_total_rows - _have_removed_rows - remove_rows;
{
- SCOPED_TIMER(local_state._evaluation_timer);
- local_state._shared_state->found_partition_end =
local_state._get_partition_by_end();
+ SCOPED_TIMER(_remove_rows_timer);
+ for (size_t i = 0; i < _agg_functions_size; i++) {
+ for (size_t j = 0; j < _agg_expr_ctxs[i].size(); j++) {
+ _agg_input_columns[i][j] =
+ _agg_input_columns[i][j]->cut(remove_rows,
left_rows)->assume_mutable();
+ }
+ }
+ for (size_t i = 0; i < _partition_exprs_size; i++) {
+ _partition_by_columns[i] =
+ _partition_by_columns[i]->cut(remove_rows,
left_rows)->assume_mutable();
+ }
+ for (size_t i = 0; i < _order_by_exprs_size; i++) {
+ _order_by_columns[i] =
+ _order_by_columns[i]->cut(remove_rows,
left_rows)->assume_mutable();
+ }
}
- local_state._refresh_need_more_input();
- return Status::OK();
-}
+ COUNTER_UPDATE(_remove_count, 1);
+ COUNTER_UPDATE(_remove_rows, remove_rows);
+ _current_row_position -= remove_rows;
+ _partition_by_pose.remove_unused_rows(remove_rows);
+ _order_by_pose.remove_unused_rows(remove_rows);
+ int64_t candidate_partition_end_size = _next_partition_ends.size();
+ while (--candidate_partition_end_size >= 0) {
+ auto peek = _next_partition_ends.front();
+ _next_partition_ends.pop();
+ _next_partition_ends.push(peek - remove_rows);
+ }
+ int64_t candidate_peer_group_end_size = _next_order_by_ends.size();
+ while (--candidate_peer_group_end_size >= 0) {
+ auto peek = _next_order_by_ends.front();
+ _next_order_by_ends.pop();
+ _next_order_by_ends.push(peek - remove_rows);
+ }
+ _removed_block_index += block_num;
+ _have_removed_rows += remove_rows;
-size_t AnalyticSinkOperatorX::get_reserve_mem_size(RuntimeState* state, bool
eos) {
- auto& local_state = get_local_state(state);
- return local_state._reserve_mem_size;
+ DCHECK_GE(_current_row_position, 0);
+ DCHECK_GE(_partition_by_pose.end, 0);
}
Status AnalyticSinkOperatorX::_insert_range_column(vectorized::Block* block,
@@ -354,6 +881,35 @@ Status
AnalyticSinkOperatorX::_insert_range_column(vectorized::Block* block,
return Status::OK();
}
+void AnalyticSinkLocalState::_reset_agg_status() {
+ for (size_t i = 0; i < _agg_functions_size; ++i) {
+ _agg_functions[i]->reset(_fn_place_ptr +
_offsets_of_aggregate_states[i]);
+ }
+}
+
+void AnalyticSinkLocalState::_create_agg_status() {
+ for (size_t i = 0; i < _agg_functions_size; ++i) {
+ try {
+ _agg_functions[i]->create(_fn_place_ptr +
_offsets_of_aggregate_states[i]);
+ } catch (...) {
+ for (int j = 0; j < i; ++j) {
+ _agg_functions[j]->destroy(_fn_place_ptr +
_offsets_of_aggregate_states[j]);
+ }
+ throw;
+ }
+ }
+ _agg_functions_created = true;
+}
+
+void AnalyticSinkLocalState::_destroy_agg_status() {
+ if (UNLIKELY(_fn_place_ptr == nullptr || !_agg_functions_created)) {
+ return;
+ }
+ for (size_t i = 0; i < _agg_functions_size; ++i) {
+ _agg_functions[i]->destroy(_fn_place_ptr +
_offsets_of_aggregate_states[i]);
+ }
+}
+
template class DataSinkOperatorX<AnalyticSinkLocalState>;
} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/analytic_sink_operator.h
b/be/src/pipeline/exec/analytic_sink_operator.h
index 67ef7df783d..3d910aaab7f 100644
--- a/be/src/pipeline/exec/analytic_sink_operator.h
+++ b/be/src/pipeline/exec/analytic_sink_operator.h
@@ -28,6 +28,40 @@ namespace doris {
namespace pipeline {
class AnalyticSinkOperatorX;
+struct BoundaryPose {
+ int64_t start = 0;
+ int64_t end = 0;
+ bool is_ended = false;
+ void remove_unused_rows(int64_t cnt) {
+ start -= cnt;
+ end -= cnt;
+ }
+};
+
+class PartitionStatistics {
+public:
+ void update(int64_t size) {
+ _count++;
+ _cumulative_size += size;
+ _average_size = _cumulative_size / _count;
+ }
+
+ void reset() {
+ _count = 0;
+ _cumulative_size = 0;
+ _average_size = 0;
+ }
+
+ bool is_high_cardinality() const { return _count > 16 && _average_size <
8; }
+
+ int64_t _count = 0;
+ int64_t _cumulative_size = 0;
+ int64_t _average_size = 0;
+};
+
+// those function cacluate need partition info, so can't be used in streaming
mode
+static const std::set<std::string> PARTITION_FUNCTION_SET {"ntile",
"cume_dist", "percent_rank"};
+
class AnalyticSinkLocalState : public
PipelineXSinkLocalState<AnalyticSharedState> {
ENABLE_FACTORY_CREATOR(AnalyticSinkLocalState);
@@ -37,36 +71,106 @@ public:
Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
Status open(RuntimeState* state) override;
+ Status close(RuntimeState* state, Status exec_status) override;
private:
friend class AnalyticSinkOperatorX;
-
- bool _refresh_need_more_input() {
- auto need_more_input =
_whether_need_next_partition(_shared_state->found_partition_end);
- if (need_more_input) {
- _dependency->set_block_to_read();
- _dependency->set_ready();
- } else {
- _dependency->block();
- _dependency->set_ready_to_read();
- }
- return need_more_input;
+ Status _execute_impl();
+ // over(partition by k1 order by k2 range|rows unbounded preceding and
unbounded following)
+ bool _get_next_for_partition(int64_t batch_rows, int64_t
current_block_base_pos);
+ // over(partition by k1 order by k2 range between unbounded preceding and
current row)
+ bool _get_next_for_unbounded_range(int64_t batch_rows, int64_t
current_block_base_pos);
+ // over(partition by k1 order by k2 range between M preceding and N
following)
+ bool _get_next_for_range_between(int64_t batch_rows, int64_t
current_block_base_pos);
+ // over(partition by k1 order by k2 rows between unbounded preceding and
current row)
+ bool _get_next_for_unbounded_rows(int64_t batch_rows, int64_t
current_block_base_pos);
+ // over(partition by k1 order by k2 rows between M preceding and N
following)
+ bool _get_next_for_sliding_rows(int64_t batch_rows, int64_t
current_block_base_pos);
+
+ void _init_result_columns();
+ void _execute_for_function(int64_t partition_start, int64_t partition_end,
int64_t frame_start,
+ int64_t frame_end);
+ void _insert_result_info(int64_t start, int64_t end);
+ int64_t current_pos_in_block() {
+ return _current_row_position + _have_removed_rows -
+ _input_block_first_row_positions[_output_block_index];
}
- BlockRowPos _get_partition_by_end();
- BlockRowPos _compare_row_to_find_end(int64_t idx, BlockRowPos start,
BlockRowPos end,
- bool need_check_first = false);
- bool _whether_need_next_partition(BlockRowPos& found_partition_end);
-
- RuntimeProfile::Counter* _evaluation_timer = nullptr;
- RuntimeProfile::Counter* _compute_agg_data_timer = nullptr;
- RuntimeProfile::Counter* _compute_partition_by_timer = nullptr;
- RuntimeProfile::Counter* _compute_order_by_timer = nullptr;
+ void _output_current_block(vectorized::Block* block);
+ void _reset_state_for_next_partition();
+ void _refresh_buffer_and_dependency_state(vectorized::Block* block);
+
+ void _create_agg_status();
+ void _reset_agg_status();
+ void _destroy_agg_status();
+ void _remove_unused_rows();
+
+ void _get_partition_by_end();
+ void _find_next_partition_ends();
+ void _update_order_by_range();
+ void _find_next_order_by_ends();
+ int64_t find_first_not_equal(vectorized::IColumn* reference_column,
+ vectorized::IColumn* compared_column, int64_t
target,
+ int64_t start, int64_t end);
std::vector<vectorized::VExprContextSPtrs> _agg_expr_ctxs;
vectorized::VExprContextSPtrs _partition_by_eq_expr_ctxs;
vectorized::VExprContextSPtrs _order_by_eq_expr_ctxs;
+ vectorized::VExprContextSPtrs _range_between_expr_ctxs;
+ std::vector<std::vector<vectorized::MutableColumnPtr>> _agg_input_columns;
+ std::vector<vectorized::MutableColumnPtr> _partition_by_columns;
+ std::vector<vectorized::MutableColumnPtr> _order_by_columns;
+ std::vector<vectorized::MutableColumnPtr> _range_result_columns;
+ size_t _partition_exprs_size = 0;
+ size_t _order_by_exprs_size = 0;
+ BoundaryPose _partition_by_pose;
+ BoundaryPose _order_by_pose;
+ PartitionStatistics _partition_column_statistics;
+ PartitionStatistics _order_by_column_statistics;
+ std::queue<int64_t> _next_partition_ends;
+ std::queue<int64_t> _next_order_by_ends;
+ size_t _agg_functions_size = 0;
+ bool _agg_functions_created = false;
+ vectorized::AggregateDataPtr _fn_place_ptr = nullptr;
+ std::unique_ptr<vectorized::Arena> _agg_arena_pool = nullptr;
+ std::vector<vectorized::AggFnEvaluator*> _agg_functions;
+ std::vector<size_t> _offsets_of_aggregate_states;
+ std::vector<bool> _result_column_nullable_flags;
+ std::vector<bool> _result_column_could_resize;
+
+ using vectorized_get_next = bool (AnalyticSinkLocalState::*)(int64_t,
int64_t);
+ struct executor {
+ vectorized_get_next get_next_impl;
+ };
+ executor _executor;
+
+ bool _current_window_empty = false;
+ bool _streaming_mode = false;
+ int64_t _current_row_position = 0;
+ int64_t _output_block_index = 0;
+ std::vector<vectorized::MutableColumnPtr> _result_window_columns;
+
+ int64_t _rows_start_offset = 0;
+ int64_t _rows_end_offset = 0;
+ int64_t _input_total_rows = 0;
+ bool _input_eos = false;
+ std::vector<vectorized::Block> _input_blocks;
+ std::vector<int64_t> _input_block_first_row_positions;
+ int64_t _removed_block_index = 0;
+ int64_t _have_removed_rows = 0;
int64_t _reserve_mem_size = 0;
+
+ RuntimeProfile::Counter* _evaluation_timer = nullptr;
+ RuntimeProfile::Counter* _compute_agg_data_timer = nullptr;
+ RuntimeProfile::Counter* _compute_partition_by_timer = nullptr;
+ RuntimeProfile::Counter* _compute_order_by_timer = nullptr;
+ RuntimeProfile::Counter* _compute_range_between_function_timer = nullptr;
+ RuntimeProfile::Counter* _partition_search_timer = nullptr;
+ RuntimeProfile::Counter* _order_search_timer = nullptr;
+ RuntimeProfile::Counter* _remove_rows_timer = nullptr;
+ RuntimeProfile::Counter* _remove_count = nullptr;
+ RuntimeProfile::Counter* _remove_rows = nullptr;
+ RuntimeProfile::HighWaterMarkCounter* _blocks_memory_usage = nullptr;
};
class AnalyticSinkOperatorX final : public
DataSinkOperatorX<AnalyticSinkLocalState> {
@@ -98,23 +202,44 @@ public:
size_t get_reserve_mem_size(RuntimeState* state, bool eos) override;
private:
+ friend class AnalyticSinkLocalState;
Status _insert_range_column(vectorized::Block* block, const
vectorized::VExprContextSPtr& expr,
vectorized::IColumn* dst_column, size_t
length);
+ Status _add_input_block(doris::RuntimeState* state, vectorized::Block*
input_block);
- friend class AnalyticSinkLocalState;
-
+ ObjectPool* _pool = nullptr;
std::vector<vectorized::VExprContextSPtrs> _agg_expr_ctxs;
vectorized::VExprContextSPtrs _partition_by_eq_expr_ctxs;
vectorized::VExprContextSPtrs _order_by_eq_expr_ctxs;
+ vectorized::VExprContextSPtrs _range_between_expr_ctxs;
size_t _agg_functions_size = 0;
+ std::vector<size_t> _num_agg_input;
+ std::vector<vectorized::AggFnEvaluator*> _agg_functions;
+ TupleId _intermediate_tuple_id;
+ TupleId _output_tuple_id;
+ TupleDescriptor* _intermediate_tuple_desc = nullptr;
+ TupleDescriptor* _output_tuple_desc = nullptr;
const TTupleId _buffered_tuple_id;
- std::vector<size_t> _num_agg_input;
const bool _is_colocate;
const bool _require_bucket_distribution;
const std::vector<TExpr> _partition_exprs;
+
+ TAnalyticWindow _window;
+ bool _has_window;
+ bool _has_range_window;
+ bool _has_window_start;
+ bool _has_window_end;
+
+ /// The offset of the n-th functions.
+ std::vector<size_t> _offsets_of_aggregate_states;
+ /// The total size of the row from the functions.
+ size_t _total_size_of_aggregate_states = 0;
+ /// The max align size for functions
+ size_t _align_aggregate_states = 1;
+ std::vector<bool> _change_to_nullable_flags;
};
} // namespace pipeline
diff --git a/be/src/pipeline/exec/analytic_source_operator.cpp
b/be/src/pipeline/exec/analytic_source_operator.cpp
index 4c455b94c9c..ce6f0d1d107 100644
--- a/be/src/pipeline/exec/analytic_source_operator.cpp
+++ b/be/src/pipeline/exec/analytic_source_operator.cpp
@@ -17,6 +17,7 @@
#include "analytic_source_operator.h"
+#include <cstddef>
#include <string>
#include "pipeline/exec/operator.h"
@@ -27,587 +28,68 @@ namespace doris::pipeline {
#include "common/compile_check_begin.h"
AnalyticLocalState::AnalyticLocalState(RuntimeState* state, OperatorXBase*
parent)
- : PipelineXLocalState<AnalyticSharedState>(state, parent),
- _output_block_index(0),
- _window_end_position(0),
- _next_partition(false),
- _rows_start_offset(0),
- _rows_end_offset(0),
- _fn_place_ptr(nullptr),
- _agg_functions_size(0),
- _agg_functions_created(false),
- _agg_arena_pool(std::make_unique<vectorized::Arena>()) {}
-
-//_partition_by_columns,_order_by_columns save in blocks, so if need to
calculate the boundary, may find in which blocks firstly
-BlockRowPos AnalyticLocalState::_compare_row_to_find_end(int64_t idx,
BlockRowPos start,
- BlockRowPos end, bool
need_check_first) {
- auto& shared_state = *_shared_state;
- int64_t start_init_row_num = start.row_num;
- vectorized::ColumnPtr start_column =
-
shared_state.input_blocks[start.block_num].get_by_position(idx).column;
- vectorized::ColumnPtr start_next_block_column = start_column;
-
- DCHECK_LE(start.block_num, end.block_num);
- DCHECK_LE(start.block_num, shared_state.input_blocks.size() - 1);
- int64_t start_block_num = start.block_num;
- int64_t end_block_num = end.block_num;
- int64_t mid_blcok_num = end.block_num;
- // To fix this problem: https://github.com/apache/doris/issues/15951
- // in this case, the partition by column is last row of block, so it's
pointed to a new block at row = 0, range is: [left, right)
- // From the perspective of order by column, the two values are exactly
equal.
- // so the range will be get wrong because it's compare_at == 0 with next
block at row = 0
- if (need_check_first && end.block_num > 0 && end.row_num == 0) {
- end.block_num--;
- end_block_num--;
- end.row_num = shared_state.input_blocks[end_block_num].rows();
- }
- //binary search find in which block
- while (start_block_num < end_block_num) {
- mid_blcok_num = (start_block_num + end_block_num + 1) >> 1;
- start_next_block_column =
-
shared_state.input_blocks[mid_blcok_num].get_by_position(idx).column;
- //Compares (*this)[n] and rhs[m], this: start[init_row] rhs: mid[0]
- if (start_column->compare_at(start_init_row_num, 0,
*start_next_block_column, 1) == 0) {
- start_block_num = mid_blcok_num;
- } else {
- end_block_num = mid_blcok_num - 1;
- }
- }
-
- // have check the start.block_num: start_column[start_init_row_num] with
mid_blcok_num start_next_block_column[0]
- // now next block must not be result, so need check with end_block_num:
start_next_block_column[last_row]
- if (end_block_num == mid_blcok_num - 1) {
- start_next_block_column =
-
shared_state.input_blocks[end_block_num].get_by_position(idx).column;
- int64_t block_size = shared_state.input_blocks[end_block_num].rows();
- if ((start_column->compare_at(start_init_row_num, block_size - 1,
*start_next_block_column,
- 1) == 0)) {
- start.block_num = end_block_num + 1;
- start.row_num = 0;
- return start;
- }
- }
-
- //check whether need get column again, maybe same as first init
- // if the start_block_num have move to forword, so need update start block
num and compare it from row_num=0
- if (start_block_num != start.block_num) {
- start_init_row_num = 0;
- start.block_num = start_block_num;
- start_column =
shared_state.input_blocks[start.block_num].get_by_position(idx).column;
- }
- //binary search, set start and end pos
- int64_t start_pos = start_init_row_num;
- int64_t end_pos = shared_state.input_blocks[start.block_num].rows();
- //if end_block_num haven't moved, only start_block_num go to the end block
- //so could use the end.row_num for binary search
- if (start.block_num == end.block_num) {
- end_pos = end.row_num;
- }
- while (start_pos < end_pos) {
- int64_t mid_pos = (start_pos + end_pos) >> 1;
- if (start_column->compare_at(start_init_row_num, mid_pos,
*start_column, 1)) {
- end_pos = mid_pos;
- } else {
- start_pos = mid_pos + 1;
- }
- }
- start.row_num = start_pos; //update row num, return the find end
- return start;
-}
-
-BlockRowPos AnalyticLocalState::_get_partition_by_end() {
- auto& shared_state = *_shared_state;
- if (shared_state.current_row_position <
- shared_state.partition_by_end.pos) { //still have data, return
partition_by_end directly
- return shared_state.partition_by_end;
- }
-
- const auto partition_exprs_size =
- _parent->cast<AnalyticSourceOperatorX>()._partition_exprs_size;
- if (partition_exprs_size == 0 ||
- (shared_state.input_total_rows == 0)) { //no partition_by, the all
block is end
- return shared_state.all_block_end;
- }
-
- BlockRowPos cal_end = shared_state.all_block_end;
- for (size_t i = 0; i < partition_exprs_size;
- ++i) { //have partition_by, binary search the partiton end
- cal_end =
_compare_row_to_find_end(shared_state.partition_by_column_idxs[i],
- shared_state.partition_by_end,
cal_end);
- }
- cal_end.pos =
shared_state.input_block_first_row_positions[cal_end.block_num] +
cal_end.row_num;
- return cal_end;
-}
-
-bool AnalyticLocalState::_whether_need_next_partition(BlockRowPos&
found_partition_end) {
- auto& shared_state = *_shared_state;
- if (shared_state.input_eos ||
- (shared_state.current_row_position <
- shared_state.partition_by_end.pos)) { //now still have partition data
- return false;
- }
- const auto partition_exprs_size =
- _parent->cast<AnalyticSourceOperatorX>()._partition_exprs_size;
- if ((partition_exprs_size == 0 && !shared_state.input_eos) ||
- (found_partition_end.pos == 0)) { //no partition, get until fetch to
EOS
- return true;
- }
- if (partition_exprs_size != 0 && found_partition_end.pos ==
shared_state.all_block_end.pos &&
- !shared_state.input_eos) { //current partition data calculate done
- return true;
- }
- return false;
-}
+ : PipelineXLocalState<AnalyticSharedState>(state, parent) {}
Status AnalyticLocalState::init(RuntimeState* state, LocalStateInfo& info) {
RETURN_IF_ERROR(PipelineXLocalState<AnalyticSharedState>::init(state,
info));
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_init_timer);
- _blocks_memory_usage =
- profile()->AddHighWaterMarkCounter("MemoryUsageBlocks",
TUnit::BYTES, "", 1);
- _evaluation_timer = ADD_TIMER(profile(), "GetPartitionBoundTime");
- _execute_timer = ADD_TIMER(profile(), "ExecuteTime");
_get_next_timer = ADD_TIMER(profile(), "GetNextTime");
- _get_result_timer = ADD_TIMER(profile(), "GetResultsTime");
- return Status::OK();
-}
-
-Status AnalyticLocalState::open(RuntimeState* state) {
- RETURN_IF_ERROR(PipelineXLocalState<AnalyticSharedState>::open(state));
- SCOPED_TIMER(exec_time_counter());
- SCOPED_TIMER(_open_timer);
-
- auto& p = _parent->cast<AnalyticSourceOperatorX>();
- _agg_functions_size = p._agg_functions.size();
- _offsets_of_aggregate_states.resize(_agg_functions_size);
- _result_column_nullable_flags.resize(_agg_functions_size);
-
- _agg_functions.resize(p._agg_functions.size());
- for (size_t i = 0; i < _agg_functions.size(); i++) {
- _agg_functions[i] = p._agg_functions[i]->clone(state,
state->obj_pool());
- _offsets_of_aggregate_states[i] = p._offsets_of_aggregate_states[i];
- _result_column_nullable_flags[i] =
-
!_agg_functions[i]->function()->get_return_type()->is_nullable() &&
- _agg_functions[i]->data_type()->is_nullable();
- }
-
- _fn_place_ptr =
_agg_arena_pool->aligned_alloc(p._total_size_of_aggregate_states,
- p._align_aggregate_states);
-
- if (!p._has_window) { //haven't set window, Unbounded: [unbounded
preceding,unbounded following]
- _executor.get_next =
std::bind<Status>(&AnalyticLocalState::_get_next_for_partition, this,
- std::placeholders::_1);
-
- } else if (p._has_range_window) {
- if (!p._has_window_end) { //haven't set end, so same as PARTITION,
[unbounded preceding, unbounded following]
- _executor.get_next =
std::bind<Status>(&AnalyticLocalState::_get_next_for_partition,
- this,
std::placeholders::_1);
-
- } else {
- _executor.get_next =
std::bind<Status>(&AnalyticLocalState::_get_next_for_range, this,
- std::placeholders::_1);
- }
-
- } else {
- if (!p._has_window_start &&
- !p._has_window_end) { //haven't set start and end, same as
PARTITION
- _executor.get_next =
std::bind<Status>(&AnalyticLocalState::_get_next_for_partition,
- this,
std::placeholders::_1);
-
- } else {
- if (p._has_window_start) { //calculate start boundary
- TAnalyticWindowBoundary b = p._window.window_start;
- if (b.__isset.rows_offset_value) { //[offset , ]
- _rows_start_offset = b.rows_offset_value;
- if (b.type == TAnalyticWindowBoundaryType::PRECEDING) {
- _rows_start_offset *= -1; //preceding--> negative
- } //current_row 0
- } else { //following positive
- DCHECK_EQ(b.type,
TAnalyticWindowBoundaryType::CURRENT_ROW); //[current row, ]
- _rows_start_offset = 0;
- }
- }
-
- if (p._has_window_end) { //calculate end boundary
- TAnalyticWindowBoundary b = p._window.window_end;
- if (b.__isset.rows_offset_value) { //[ , offset]
- _rows_end_offset = b.rows_offset_value;
- if (b.type == TAnalyticWindowBoundaryType::PRECEDING) {
- _rows_end_offset *= -1;
- }
- } else {
- DCHECK_EQ(b.type,
TAnalyticWindowBoundaryType::CURRENT_ROW); //[ ,current row]
- _rows_end_offset = 0;
- }
- }
-
- _executor.get_next =
std::bind<Status>(&AnalyticLocalState::_get_next_for_rows, this,
- std::placeholders::_1);
- }
- }
- _create_agg_status();
- return Status::OK();
-}
-
-void AnalyticLocalState::_reset_agg_status() {
- for (size_t i = 0; i < _agg_functions_size; ++i) {
- _agg_functions[i]->reset(
- _fn_place_ptr +
-
_parent->cast<AnalyticSourceOperatorX>()._offsets_of_aggregate_states[i]);
- }
-}
-
-void AnalyticLocalState::_create_agg_status() {
- for (size_t i = 0; i < _agg_functions_size; ++i) {
- try {
- _agg_functions[i]->create(
- _fn_place_ptr +
-
_parent->cast<AnalyticSourceOperatorX>()._offsets_of_aggregate_states[i]);
- } catch (...) {
- for (int j = 0; j < i; ++j) {
- _agg_functions[j]->destroy(
- _fn_place_ptr +
-
_parent->cast<AnalyticSourceOperatorX>()._offsets_of_aggregate_states[j]);
- }
- throw;
- }
- }
- _agg_functions_created = true;
-}
-
-void AnalyticLocalState::_destroy_agg_status() {
- if (UNLIKELY(_fn_place_ptr == nullptr || !_agg_functions_created)) {
- return;
- }
- for (size_t i = 0; i < _agg_functions_size; ++i) {
- _agg_functions[i]->destroy(
- _fn_place_ptr +
-
_parent->cast<AnalyticSourceOperatorX>()._offsets_of_aggregate_states[i]);
- }
-}
-
-void AnalyticLocalState::_execute_for_win_func(int64_t partition_start,
int64_t partition_end,
- int64_t frame_start, int64_t
frame_end) {
- for (size_t i = 0; i < _agg_functions_size; ++i) {
- std::vector<const vectorized::IColumn*> agg_columns;
- for (int j = 0; j < _shared_state->agg_input_columns[i].size(); ++j) {
-
agg_columns.push_back(_shared_state->agg_input_columns[i][j].get());
- }
- _agg_functions[i]->function()->add_range_single_place(
- partition_start, partition_end, frame_start, frame_end,
- _fn_place_ptr + _offsets_of_aggregate_states[i],
agg_columns.data(),
- _agg_arena_pool.get());
-
- // If the end is not greater than the start, the current window should
be empty.
- _current_window_empty =
- std::min(frame_end, partition_end) <= std::max(frame_start,
partition_start);
- }
-}
-
-void AnalyticLocalState::_insert_result_info(int64_t current_block_rows) {
- int64_t current_block_row_pos =
-
_shared_state->input_block_first_row_positions[_output_block_index];
- int64_t get_result_start = _shared_state->current_row_position -
current_block_row_pos;
- if (_parent->cast<AnalyticSourceOperatorX>()._fn_scope ==
AnalyticFnScope::PARTITION) {
- int64_t get_result_end =
- std::min<int64_t>(_shared_state->current_row_position +
current_block_rows,
- _shared_state->partition_by_end.pos);
- _window_end_position =
- std::min<int64_t>(get_result_end - current_block_row_pos,
current_block_rows);
- _shared_state->current_row_position += (_window_end_position -
get_result_start);
- } else if (_parent->cast<AnalyticSourceOperatorX>()._fn_scope ==
AnalyticFnScope::RANGE) {
- _window_end_position =
- std::min<int64_t>(_order_by_end.pos - current_block_row_pos,
current_block_rows);
- _shared_state->current_row_position += (_window_end_position -
get_result_start);
- } else {
- _window_end_position++;
- _shared_state->current_row_position++;
- }
-
- for (size_t i = 0; i < _agg_functions_size; ++i) {
- for (size_t j = get_result_start; j < _window_end_position; ++j) {
- if (_result_column_nullable_flags[i]) {
- if (_current_window_empty) {
- _result_window_columns[i]->insert_default();
- } else {
- auto* dst = assert_cast<vectorized::ColumnNullable*>(
- _result_window_columns[i].get());
- dst->get_null_map_data().push_back(0);
- _agg_functions[i]->insert_result_info(
- _fn_place_ptr + _offsets_of_aggregate_states[i],
- &dst->get_nested_column());
- }
- } else {
- _agg_functions[i]->insert_result_info(
- _fn_place_ptr + _offsets_of_aggregate_states[i],
- _result_window_columns[i].get());
- }
- }
- }
-}
-
-Status AnalyticLocalState::_get_next_for_rows(size_t current_block_rows) {
- SCOPED_TIMER(_get_next_timer);
- while (_shared_state->current_row_position <
_shared_state->partition_by_end.pos &&
- _window_end_position < current_block_rows) {
- int64_t range_start, range_end;
- if
(!_parent->cast<AnalyticSourceOperatorX>()._window.__isset.window_start &&
- _parent->cast<AnalyticSourceOperatorX>()._window.window_end.type ==
- TAnalyticWindowBoundaryType::CURRENT_ROW) {
- // [preceding, current_row], [current_row, following] rewrite it's
same
- // as could reuse the previous calculate result, so don't call
_reset_agg_status function
- // going on calculate, add up data, no need to reset state
- range_start = _shared_state->current_row_position;
- range_end = _shared_state->current_row_position + 1;
- } else {
- _reset_agg_status();
- range_end = _shared_state->current_row_position + _rows_end_offset
+ 1;
- //[preceding, offset] --unbound: [preceding, following]
- if
(!_parent->cast<AnalyticSourceOperatorX>()._window.__isset.window_start) {
- range_start = _partition_by_start.pos;
- } else {
- range_start = _shared_state->current_row_position +
_rows_start_offset;
- }
- // Make sure range_start <= range_end
- range_start = std::min(range_start, range_end);
- }
- _execute_for_win_func(_partition_by_start.pos,
_shared_state->partition_by_end.pos,
- range_start, range_end);
- _insert_result_info(current_block_rows);
- }
- return Status::OK();
-}
-
-Status AnalyticLocalState::_get_next_for_partition(size_t current_block_rows) {
- SCOPED_TIMER(_get_next_timer);
- if (_next_partition) {
- _execute_for_win_func(_partition_by_start.pos,
_shared_state->partition_by_end.pos,
- _partition_by_start.pos,
_shared_state->partition_by_end.pos);
- }
- _insert_result_info(current_block_rows);
- return Status::OK();
-}
-
-Status AnalyticLocalState::_get_next_for_range(size_t current_block_rows) {
- SCOPED_TIMER(_get_next_timer);
- while (_shared_state->current_row_position <
_shared_state->partition_by_end.pos &&
- _window_end_position < current_block_rows) {
- if (_shared_state->current_row_position >= _order_by_end.pos) {
- _update_order_by_range();
- _execute_for_win_func(_partition_by_start.pos,
_shared_state->partition_by_end.pos,
- _order_by_start.pos, _order_by_end.pos);
- }
- _insert_result_info(current_block_rows);
- }
- return Status::OK();
-}
-
-void AnalyticLocalState::_update_order_by_range() {
- _order_by_start = _order_by_end;
- _order_by_end = _shared_state->partition_by_end;
- for (size_t i = 0; i <
_parent->cast<AnalyticSourceOperatorX>()._order_by_exprs_size; ++i) {
- _order_by_end =
_compare_row_to_find_end(_shared_state->ordey_by_column_idxs[i],
- _order_by_start,
_order_by_end, true);
- }
- _order_by_start.pos =
-
_shared_state->input_block_first_row_positions[_order_by_start.block_num] +
- _order_by_start.row_num;
- _order_by_end.pos =
_shared_state->input_block_first_row_positions[_order_by_end.block_num] +
- _order_by_end.row_num;
- // `_order_by_end` will be assigned to `_order_by_start` next time,
- // so make it a valid position.
- if (_order_by_end.row_num ==
_shared_state->input_blocks[_order_by_end.block_num].rows()) {
- _order_by_end.block_num++;
- _order_by_end.row_num = 0;
- }
-}
-
-void AnalyticLocalState::init_result_columns() {
- if (!_window_end_position) {
- _result_window_columns.resize(_agg_functions_size);
- for (size_t i = 0; i < _agg_functions_size; ++i) {
- _result_window_columns[i] =
- _agg_functions[i]->data_type()->create_column(); //return
type
- }
- }
-}
-
-//calculate pos have arrive partition end, so it's needed to init next
partition, and update the boundary of partition
-bool AnalyticLocalState::init_next_partition(BlockRowPos found_partition_end) {
- if ((_shared_state->current_row_position >=
_shared_state->partition_by_end.pos) &&
- ((_shared_state->partition_by_end.pos == 0) ||
- (_shared_state->partition_by_end.pos != found_partition_end.pos))) {
- _partition_by_start = _shared_state->partition_by_end;
- _shared_state->partition_by_end = found_partition_end;
- _shared_state->current_row_position = _partition_by_start.pos;
- _reset_agg_status();
- return true;
- }
- return false;
-}
-
-Status AnalyticLocalState::output_current_block(vectorized::Block* block) {
- block->swap(std::move(_shared_state->input_blocks[_output_block_index]));
- _blocks_memory_usage->add(block->allocated_bytes());
-
-
DCHECK(_parent->cast<AnalyticSourceOperatorX>()._change_to_nullable_flags.size()
==
- _result_window_columns.size());
- for (size_t i = 0; i < _result_window_columns.size(); ++i) {
- if
(_parent->cast<AnalyticSourceOperatorX>()._change_to_nullable_flags[i]) {
- block->insert({make_nullable(std::move(_result_window_columns[i])),
- make_nullable(_agg_functions[i]->data_type()), ""});
- } else {
- block->insert(
- {std::move(_result_window_columns[i]),
_agg_functions[i]->data_type(), ""});
- }
- }
-
- _output_block_index++;
- _window_end_position = 0;
-
+ _filtered_rows_counter = ADD_COUNTER(profile(), "FilteredRows",
TUnit::UNIT);
return Status::OK();
}
AnalyticSourceOperatorX::AnalyticSourceOperatorX(ObjectPool* pool, const
TPlanNode& tnode,
int operator_id, const
DescriptorTbl& descs)
- : OperatorX<AnalyticLocalState>(pool, tnode, operator_id, descs),
- _window(tnode.analytic_node.window),
- _intermediate_tuple_id(tnode.analytic_node.intermediate_tuple_id),
- _output_tuple_id(tnode.analytic_node.output_tuple_id),
- _has_window(tnode.analytic_node.__isset.window),
- _has_range_window(tnode.analytic_node.window.type ==
TAnalyticWindowType::RANGE),
- _has_window_start(tnode.analytic_node.window.__isset.window_start),
- _has_window_end(tnode.analytic_node.window.__isset.window_end),
- _partition_exprs_size(tnode.analytic_node.partition_exprs.size()),
- _order_by_exprs_size(tnode.analytic_node.order_by_exprs.size()) {
+ : OperatorX<AnalyticLocalState>(pool, tnode, operator_id, descs) {
_is_serial_operator = tnode.__isset.is_serial_operator &&
tnode.is_serial_operator;
- _fn_scope = AnalyticFnScope::PARTITION;
- if (tnode.analytic_node.__isset.window &&
- tnode.analytic_node.window.type == TAnalyticWindowType::RANGE) {
- DCHECK(!_window.__isset.window_start) << "RANGE windows must have
UNBOUNDED PRECEDING";
- DCHECK(!_window.__isset.window_end ||
- _window.window_end.type ==
TAnalyticWindowBoundaryType::CURRENT_ROW)
- << "RANGE window end bound must be CURRENT ROW or UNBOUNDED
FOLLOWING";
-
- if (_window.__isset
- .window_end) { //haven't set end, so same as PARTITION,
[unbounded preceding, unbounded following]
- _fn_scope = AnalyticFnScope::RANGE; //range: [unbounded
preceding,current row]
- }
-
- } else if (tnode.analytic_node.__isset.window) {
- if (_window.__isset.window_start || _window.__isset.window_end) {
- _fn_scope = AnalyticFnScope::ROWS;
- }
- }
}
-Status AnalyticSourceOperatorX::init(const TPlanNode& tnode, RuntimeState*
state) {
- RETURN_IF_ERROR(OperatorX<AnalyticLocalState>::init(tnode, state));
- const TAnalyticNode& analytic_node = tnode.analytic_node;
- size_t agg_size = analytic_node.analytic_functions.size();
- for (int i = 0; i < agg_size; ++i) {
- vectorized::AggFnEvaluator* evaluator = nullptr;
- // Window function treats all NullableAggregateFunction as
AlwaysNullable.
- // Its behavior is same with executed without group by key.
- // https://github.com/apache/doris/pull/40693
- RETURN_IF_ERROR(vectorized::AggFnEvaluator::create(
- _pool, analytic_node.analytic_functions[i], {}, /*wihout_key*/
true, &evaluator));
- _agg_functions.emplace_back(evaluator);
- }
-
- return Status::OK();
-}
-
-Status AnalyticSourceOperatorX::get_block(RuntimeState* state,
vectorized::Block* block,
+Status AnalyticSourceOperatorX::get_block(RuntimeState* state,
vectorized::Block* output_block,
bool* eos) {
+ RETURN_IF_CANCELLED(state);
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
- SCOPED_PEAK_MEM(&local_state._estimate_memory_usage);
- if (local_state._shared_state->input_eos &&
- (local_state._output_block_index ==
local_state._shared_state->input_blocks.size() ||
- local_state._shared_state->input_total_rows == 0)) {
- *eos = true;
- return Status::OK();
- }
-
- while (!local_state._shared_state->input_eos ||
- local_state._output_block_index <
local_state._shared_state->input_blocks.size()) {
- {
- SCOPED_TIMER(local_state._evaluation_timer);
- local_state._shared_state->found_partition_end =
local_state._get_partition_by_end();
- }
- if (local_state._refresh_need_more_input()) {
- return Status::OK();
- }
- local_state._next_partition =
-
local_state.init_next_partition(local_state._shared_state->found_partition_end);
- local_state.init_result_columns();
- size_t current_block_rows =
-
local_state._shared_state->input_blocks[local_state._output_block_index].rows();
- RETURN_IF_ERROR(local_state._executor.get_next(current_block_rows));
- if (local_state._window_end_position == current_block_rows) {
- break;
+ SCOPED_TIMER(local_state._get_next_timer);
+ output_block->clear_column_data();
+ size_t output_rows = 0;
+ {
+ std::lock_guard<std::mutex>
lock(local_state._shared_state->buffer_mutex);
+ if (!local_state._shared_state->blocks_buffer.empty()) {
+
local_state._shared_state->blocks_buffer.front().swap(*output_block);
+ local_state._shared_state->blocks_buffer.pop();
+ output_rows = output_block->rows();
+ //if buffer have no data and sink not eos, block reading and wait
for signal again
+ RETURN_IF_ERROR(vectorized::VExprContext::filter_block(
+ local_state._conjuncts, output_block,
output_block->columns()));
+ if (local_state._shared_state->blocks_buffer.empty() &&
+ !local_state._shared_state->sink_eos) {
+ // add this mutex to check, as in some case maybe is doing
block(), and the sink is doing set eos.
+ // so have to hold mutex to set block(), avoid to sink have
set eos and set ready, but here set block() by mistake
+ std::unique_lock<std::mutex>
lc(local_state._shared_state->sink_eos_lock);
+ if (!local_state._shared_state->sink_eos) {
+ local_state._dependency->block(); // block
self source
+ local_state._dependency->set_ready_to_write(); // ready
for sink write
+ }
+ }
+ } else {
+ //iff buffer have no data and sink eos, set eos
+ std::unique_lock<std::mutex>
lc(local_state._shared_state->sink_eos_lock);
+ *eos = local_state._shared_state->sink_eos;
}
}
- RETURN_IF_ERROR(local_state.output_current_block(block));
- RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts, block,
block->columns()));
- local_state.reached_limit(block, eos);
- return Status::OK();
-}
-
-Status AnalyticLocalState::close(RuntimeState* state) {
- SCOPED_TIMER(exec_time_counter());
- SCOPED_TIMER(_close_timer);
- if (_closed) {
- return Status::OK();
+ local_state.reached_limit(output_block, eos);
+ if (!output_block->empty()) {
+ auto return_rows = output_block->rows();
+ local_state._num_rows_returned += return_rows;
+ COUNTER_UPDATE(local_state._filtered_rows_counter, output_rows -
return_rows);
}
-
- _destroy_agg_status();
- _agg_arena_pool = nullptr;
-
- std::vector<vectorized::MutableColumnPtr> tmp_result_window_columns;
- _result_window_columns.swap(tmp_result_window_columns);
- return PipelineXLocalState<AnalyticSharedState>::close(state);
+ return Status::OK();
}
Status AnalyticSourceOperatorX::open(RuntimeState* state) {
RETURN_IF_ERROR(OperatorX<AnalyticLocalState>::open(state));
DCHECK(_child->row_desc().is_prefix_of(_row_descriptor));
- _intermediate_tuple_desc =
state->desc_tbl().get_tuple_descriptor(_intermediate_tuple_id);
- _output_tuple_desc =
state->desc_tbl().get_tuple_descriptor(_output_tuple_id);
- for (size_t i = 0; i < _agg_functions.size(); ++i) {
- SlotDescriptor* intermediate_slot_desc =
_intermediate_tuple_desc->slots()[i];
- SlotDescriptor* output_slot_desc = _output_tuple_desc->slots()[i];
- RETURN_IF_ERROR(_agg_functions[i]->prepare(state, _child->row_desc(),
- intermediate_slot_desc,
output_slot_desc));
- _agg_functions[i]->set_version(state->be_exec_version());
- _change_to_nullable_flags.push_back(output_slot_desc->is_nullable() &&
-
!_agg_functions[i]->data_type()->is_nullable());
- }
-
- _offsets_of_aggregate_states.resize(_agg_functions.size());
- for (size_t i = 0; i < _agg_functions.size(); ++i) {
- _offsets_of_aggregate_states[i] = _total_size_of_aggregate_states;
- const auto& agg_function = _agg_functions[i]->function();
- // aggregate states are aligned based on maximum requirement
- _align_aggregate_states = std::max(_align_aggregate_states,
agg_function->align_of_data());
- _total_size_of_aggregate_states += agg_function->size_of_data();
- // If not the last aggregate_state, we need pad it so that next
aggregate_state will be aligned.
- if (i + 1 < _agg_functions.size()) {
- size_t alignment_of_next_state = _agg_functions[i +
1]->function()->align_of_data();
- if ((alignment_of_next_state & (alignment_of_next_state - 1)) !=
0) {
- return Status::RuntimeError("Logical error: align_of_data is
not 2^N");
- }
- /// Extend total_size to next alignment requirement
- /// Add padding by rounding up 'total_size_of_aggregate_states' to
be a multiplier of alignment_of_next_state.
- _total_size_of_aggregate_states =
- (_total_size_of_aggregate_states + alignment_of_next_state
- 1) /
- alignment_of_next_state * alignment_of_next_state;
- }
- }
- for (auto* agg_function : _agg_functions) {
- RETURN_IF_ERROR(agg_function->open(state));
- }
return Status::OK();
}
diff --git a/be/src/pipeline/exec/analytic_source_operator.h
b/be/src/pipeline/exec/analytic_source_operator.h
index 639a27ffb7c..be1fdb2c9e5 100644
--- a/be/src/pipeline/exec/analytic_source_operator.h
+++ b/be/src/pipeline/exec/analytic_source_operator.h
@@ -27,89 +27,18 @@ class RuntimeState;
namespace pipeline {
#include "common/compile_check_begin.h"
-enum AnalyticFnScope { PARTITION, RANGE, ROWS };
class AnalyticSourceOperatorX;
class AnalyticLocalState final : public
PipelineXLocalState<AnalyticSharedState> {
public:
ENABLE_FACTORY_CREATOR(AnalyticLocalState);
AnalyticLocalState(RuntimeState* state, OperatorXBase* parent);
-
Status init(RuntimeState* state, LocalStateInfo& info) override;
- Status open(RuntimeState* state) override;
- Status close(RuntimeState* state) override;
-
- void init_result_columns();
-
- Status output_current_block(vectorized::Block* block);
-
- bool init_next_partition(BlockRowPos found_partition_end);
private:
- Status _get_next_for_rows(size_t rows);
- Status _get_next_for_range(size_t rows);
- Status _get_next_for_partition(size_t rows);
-
- void _execute_for_win_func(int64_t partition_start, int64_t partition_end,
int64_t frame_start,
- int64_t frame_end);
- void _insert_result_info(int64_t current_block_rows);
-
- void _update_order_by_range();
- bool _refresh_need_more_input() {
- auto need_more_input =
_whether_need_next_partition(_shared_state->found_partition_end);
- if (need_more_input) {
- _dependency->block();
- _dependency->set_ready_to_write();
- } else {
- _dependency->set_block_to_write();
- _dependency->set_ready();
- }
- return need_more_input;
- }
- BlockRowPos _get_partition_by_end();
- BlockRowPos _compare_row_to_find_end(int64_t idx, BlockRowPos start,
BlockRowPos end,
- bool need_check_first = false);
- bool _whether_need_next_partition(BlockRowPos& found_partition_end);
-
- void _reset_agg_status();
- void _create_agg_status();
- void _destroy_agg_status();
-
friend class AnalyticSourceOperatorX;
-
- int64_t _output_block_index;
- int64_t _window_end_position;
- bool _next_partition;
- std::vector<vectorized::MutableColumnPtr> _result_window_columns;
-
- int64_t _rows_start_offset;
- int64_t _rows_end_offset;
- vectorized::AggregateDataPtr _fn_place_ptr;
- size_t _agg_functions_size;
- bool _agg_functions_created;
- bool _current_window_empty = false;
-
- BlockRowPos _order_by_start;
- BlockRowPos _order_by_end;
- BlockRowPos _partition_by_start;
- std::unique_ptr<vectorized::Arena> _agg_arena_pool;
- std::vector<vectorized::AggFnEvaluator*> _agg_functions;
- std::vector<size_t> _offsets_of_aggregate_states;
- std::vector<bool> _result_column_nullable_flags;
-
- RuntimeProfile::Counter* _evaluation_timer = nullptr;
- RuntimeProfile::Counter* _execute_timer = nullptr;
RuntimeProfile::Counter* _get_next_timer = nullptr;
- RuntimeProfile::Counter* _get_result_timer = nullptr;
- RuntimeProfile::HighWaterMarkCounter* _blocks_memory_usage = nullptr;
-
- using vectorized_get_next = std::function<Status(size_t rows)>;
-
- struct executor {
- vectorized_get_next get_next;
- };
-
- executor _executor;
+ RuntimeProfile::Counter* _filtered_rows_counter = nullptr;
};
class AnalyticSourceOperatorX final : public OperatorX<AnalyticLocalState> {
@@ -121,39 +50,10 @@ public:
bool is_source() const override { return true; }
- Status init(const TPlanNode& tnode, RuntimeState* state) override;
Status open(RuntimeState* state) override;
private:
friend class AnalyticLocalState;
-
- TAnalyticWindow _window;
-
- TupleId _intermediate_tuple_id;
- TupleId _output_tuple_id;
-
- bool _has_window;
- bool _has_range_window;
- bool _has_window_start;
- bool _has_window_end;
-
- std::vector<vectorized::AggFnEvaluator*> _agg_functions;
-
- AnalyticFnScope _fn_scope;
-
- TupleDescriptor* _intermediate_tuple_desc = nullptr;
- TupleDescriptor* _output_tuple_desc = nullptr;
-
- /// The offset of the n-th functions.
- std::vector<size_t> _offsets_of_aggregate_states;
- /// The total size of the row from the functions.
- size_t _total_size_of_aggregate_states = 0;
- /// The max align size for functions
- size_t _align_aggregate_states = 1;
-
- std::vector<bool> _change_to_nullable_flags;
- const size_t _partition_exprs_size;
- const size_t _order_by_exprs_size;
};
} // namespace pipeline
diff --git a/be/src/vec/aggregate_functions/aggregate_function.h
b/be/src/vec/aggregate_functions/aggregate_function.h
index d761d40c4c9..9a25056d213 100644
--- a/be/src/vec/aggregate_functions/aggregate_function.h
+++ b/be/src/vec/aggregate_functions/aggregate_function.h
@@ -233,6 +233,19 @@ public:
virtual Status verify_result_type(const bool without_key, const DataTypes&
argument_types,
const DataTypePtr result_type) const = 0;
+ // agg function is used result column push_back to insert result,
+ // and now want's resize column early and use operator[] to insert result.
+ // but like result column is string column, it's can't resize dirctly with
operator[]
+ // need template specialization agg for the string type in
insert_result_into_range
+ virtual bool result_column_could_resize() const { return false; }
+
+ virtual void insert_result_into_range(ConstAggregateDataPtr __restrict
place, IColumn& to,
+ const size_t start, const size_t
end) const {
+ for (size_t i = start; i < end; ++i) {
+ insert_result_into(place, to);
+ }
+ }
+
protected:
DataTypes argument_types;
int version {};
diff --git a/be/src/vec/aggregate_functions/aggregate_function_window.h
b/be/src/vec/aggregate_functions/aggregate_function_window.h
index 5d449318b7d..4b2dfb4d65b 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_window.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_window.h
@@ -76,7 +76,18 @@ public:
}
void insert_result_into(ConstAggregateDataPtr place, IColumn& to) const
override {
- assert_cast<ColumnInt64&>(to).get_data().push_back(data(place).count);
+ assert_cast<ColumnInt64&,
TypeCheckOnRelease::DISABLE>(to).get_data().push_back(
+ doris::vectorized::WindowFunctionRowNumber::data(place).count);
+ }
+
+ bool result_column_could_resize() const override { return true; }
+
+ void insert_result_into_range(ConstAggregateDataPtr __restrict place,
IColumn& to,
+ const size_t start, const size_t end) const
override {
+ auto& column = assert_cast<ColumnInt64&,
TypeCheckOnRelease::DISABLE>(to);
+ for (size_t i = start; i < end; ++i) {
+ column.get_data()[i] =
(doris::vectorized::WindowFunctionRowNumber::data(place).count);
+ }
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena*)
const override {}
@@ -86,8 +97,8 @@ public:
struct RankData {
int64_t rank = 0;
- int64_t count = 0;
- int64_t peer_group_start = 0;
+ int64_t count = 1;
+ int64_t peer_group_start = -1;
};
class WindowFunctionRank final : public IAggregateFunctionDataHelper<RankData,
WindowFunctionRank> {
@@ -121,7 +132,18 @@ public:
}
void insert_result_into(ConstAggregateDataPtr place, IColumn& to) const
override {
- assert_cast<ColumnInt64&>(to).get_data().push_back(data(place).rank);
+ assert_cast<ColumnInt64&,
TypeCheckOnRelease::DISABLE>(to).get_data().push_back(
+ data(place).rank);
+ }
+
+ bool result_column_could_resize() const override { return true; }
+
+ void insert_result_into_range(ConstAggregateDataPtr __restrict place,
IColumn& to,
+ const size_t start, const size_t end) const
override {
+ auto& column = assert_cast<ColumnInt64&,
TypeCheckOnRelease::DISABLE>(to);
+ for (size_t i = start; i < end; ++i) {
+ column.get_data()[i] =
(doris::vectorized::WindowFunctionRank::data(place).rank);
+ }
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena*)
const override {}
@@ -131,7 +153,7 @@ public:
struct DenseRankData {
int64_t rank = 0;
- int64_t peer_group_start = 0;
+ int64_t peer_group_start = -1;
};
class WindowFunctionDenseRank final
@@ -163,7 +185,18 @@ public:
}
void insert_result_into(ConstAggregateDataPtr place, IColumn& to) const
override {
- assert_cast<ColumnInt64&>(to).get_data().push_back(data(place).rank);
+ assert_cast<ColumnInt64&,
TypeCheckOnRelease::DISABLE>(to).get_data().push_back(
+ data(place).rank);
+ }
+
+ bool result_column_could_resize() const override { return true; }
+
+ void insert_result_into_range(ConstAggregateDataPtr __restrict place,
IColumn& to,
+ const size_t start, const size_t end) const
override {
+ auto& column = assert_cast<ColumnInt64&,
TypeCheckOnRelease::DISABLE>(to);
+ for (size_t i = start; i < end; ++i) {
+ column.get_data()[i] =
(doris::vectorized::WindowFunctionDenseRank::data(place).rank);
+ }
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena*)
const override {}
@@ -173,8 +206,8 @@ public:
struct PercentRankData {
int64_t rank = 0;
- int64_t count = 0;
- int64_t peer_group_start = 0;
+ int64_t count = 1;
+ int64_t peer_group_start = -1;
int64_t partition_size = 0;
};
@@ -219,7 +252,19 @@ public:
void insert_result_into(ConstAggregateDataPtr place, IColumn& to) const
override {
auto percent_rank = _cal_percent(data(place).rank,
data(place).partition_size);
- assert_cast<ColumnFloat64&>(to).get_data().push_back(percent_rank);
+ assert_cast<ColumnFloat64&,
TypeCheckOnRelease::DISABLE>(to).get_data().push_back(
+ percent_rank);
+ }
+
+ bool result_column_could_resize() const override { return true; }
+
+ void insert_result_into_range(ConstAggregateDataPtr __restrict place,
IColumn& to,
+ const size_t start, const size_t end) const
override {
+ auto& column = assert_cast<ColumnFloat64&,
TypeCheckOnRelease::DISABLE>(to);
+ auto percent_rank = _cal_percent(data(place).rank,
data(place).partition_size);
+ for (size_t i = start; i < end; ++i) {
+ column.get_data()[i] = percent_rank;
+ }
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena*)
const override {}
@@ -230,7 +275,7 @@ public:
struct CumeDistData {
int64_t numerator = 0;
int64_t denominator = 0;
- int64_t peer_group_start = 0;
+ int64_t peer_group_start = -1;
};
class WindowFunctionCumeDist final
@@ -272,7 +317,19 @@ public:
void insert_result_into(ConstAggregateDataPtr place, IColumn& to) const
override {
auto cume_dist = (double)data(place).numerator * 1.0 /
(double)data(place).denominator;
- assert_cast<ColumnFloat64&>(to).get_data().push_back(cume_dist);
+ assert_cast<ColumnFloat64&,
TypeCheckOnRelease::DISABLE>(to).get_data().push_back(
+ cume_dist);
+ }
+
+ bool result_column_could_resize() const override { return true; }
+
+ void insert_result_into_range(ConstAggregateDataPtr __restrict place,
IColumn& to,
+ const size_t start, const size_t end) const
override {
+ auto& column = assert_cast<ColumnFloat64&,
TypeCheckOnRelease::DISABLE>(to);
+ auto cume_dist = (double)data(place).numerator * 1.0 /
(double)data(place).denominator;
+ for (size_t i = start; i < end; ++i) {
+ column.get_data()[i] = cume_dist;
+ }
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena*)
const override {}
@@ -323,10 +380,20 @@ public:
void reset(AggregateDataPtr place) const override {
WindowFunctionNTile::data(place).rows = 0; }
void insert_result_into(ConstAggregateDataPtr place, IColumn& to) const
override {
- assert_cast<ColumnInt64&>(to).get_data().push_back(
+ assert_cast<ColumnInt64&,
TypeCheckOnRelease::DISABLE>(to).get_data().push_back(
WindowFunctionNTile::data(place).bucket_index);
}
+ bool result_column_could_resize() const override { return true; }
+
+ void insert_result_into_range(ConstAggregateDataPtr __restrict place,
IColumn& to,
+ const size_t start, const size_t end) const
override {
+ auto& column = assert_cast<ColumnInt64&,
TypeCheckOnRelease::DISABLE>(to);
+ for (size_t i = start; i < end; ++i) {
+ column.get_data()[i] =
WindowFunctionNTile::data(place).bucket_index;
+ }
+ }
+
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena*)
const override {}
void serialize(ConstAggregateDataPtr place, BufferWritable& buf) const
override {}
void deserialize(AggregateDataPtr place, BufferReadable& buf, Arena*)
const override {}
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 1345cac66cb..423f3355f58 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -1118,6 +1118,8 @@ struct TAnalyticNode {
9: optional Exprs.TExpr order_by_eq
10: optional bool is_colocate
+
+ 11: optional list<Exprs.TExpr> range_between_offset_exprs
}
struct TMergeNode {
diff --git
a/regression-test/data/query_p0/sql_functions/window_functions/test_column_boundary.out
b/regression-test/data/query_p0/sql_functions/window_functions/test_column_boundary.out
new file mode 100644
index 00000000000..aef12dc42c5
Binary files /dev/null and
b/regression-test/data/query_p0/sql_functions/window_functions/test_column_boundary.out
differ
diff --git
a/regression-test/suites/query_p0/sql_functions/window_functions/test_column_boundary.groovy
b/regression-test/suites/query_p0/sql_functions/window_functions/test_column_boundary.groovy
new file mode 100644
index 00000000000..bfdf501d904
--- /dev/null
+++
b/regression-test/suites/query_p0/sql_functions/window_functions/test_column_boundary.groovy
@@ -0,0 +1,54 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_column_boundary") {
+ sql """ DROP TABLE IF EXISTS test_column_boundary """
+ sql """
+ CREATE TABLE IF NOT EXISTS test_column_boundary (
+ u_id int NULL COMMENT "",
+ u_city varchar(20) NULL COMMENT ""
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`u_id`, `u_city`)
+ DISTRIBUTED BY HASH(`u_id`, `u_city`) BUCKETS 1
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1",
+ "in_memory" = "false",
+ "storage_format" = "V2"
+ );
+ """
+
+ sql """ insert into test_column_boundary select number, number + random()
from numbers("number" = "1000000"); """
+ Integer count = 0;
+ Integer maxCount = 25;
+ while (count < maxCount) {
+ sql """ insert into test_column_boundary select number, number +
random() from numbers("number" = "10000000"); """
+ count++
+ sleep(100);
+ }
+ sql """ set parallel_pipeline_task_num = 1; """
+
+ qt_sql_1 """ select count() from test_column_boundary; """ // 256000000
rows
+ test {
+ // column size is too large
+ sql """ select sum(res) from (select count() over(partition by u_city)
as res from test_column_boundary) as t; """
+ exception "string column length is too large"
+ }
+ sql """ DROP TABLE IF EXISTS test_column_boundary """
+}
+
+
+
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]