This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new f9baf9c556 [improvement](scan) Support pushdown execute expr ctx
(#15917)
f9baf9c556 is described below
commit f9baf9c556e1ecf46d2c5039310d56953eeadea7
Author: Xinyi Zou <[email protected]>
AuthorDate: Fri Mar 10 08:35:32 2023 +0800
[improvement](scan) Support pushdown execute expr ctx (#15917)
In the past, only simple predicates (slot=const), and, like, or (only
bitmap index) could be pushed down to the storage layer. scan process:
Read part of the column first, and calculate the row ids with a simple
push-down predicate.
Use row ids to read the remaining columns and pass them to the scanner, and
the scanner filters the remaining predicates.
This pr will also push-down the remaining predicates (functions, nested
predicates...) in the scanner to the storage layer for filtering. scan process:
Read part of the column first, and use the push-down simple predicate to
calculate the row ids, (same as above)
Use row ids to read the columns needed for the remaining predicates, and
use the pushed-down remaining predicates to reduce the number of row ids again.
Use row ids to read the remaining columns and pass them to the scanner.
---
be/src/olap/iterators.h | 1 +
be/src/olap/olap_common.h | 2 +
be/src/olap/reader.cpp | 1 +
be/src/olap/reader.h | 1 +
be/src/olap/rowset/beta_rowset_reader.cpp | 1 +
be/src/olap/rowset/rowset_reader_context.h | 1 +
be/src/olap/rowset/segment_v2/segment_iterator.cpp | 391 +++++++++++++++++----
be/src/olap/rowset/segment_v2/segment_iterator.h | 21 +-
be/src/runtime/runtime_state.h | 5 +
be/src/vec/core/block.cpp | 16 +-
be/src/vec/core/block.h | 2 +-
be/src/vec/exec/scan/new_olap_scan_node.cpp | 16 +-
be/src/vec/exec/scan/new_olap_scan_node.h | 4 +
be/src/vec/exec/scan/new_olap_scanner.cpp | 17 +-
be/src/vec/exec/scan/new_olap_scanner.h | 3 +-
be/src/vec/exec/scan/vscan_node.cpp | 9 +-
be/src/vec/exec/scan/vscan_node.h | 3 +
be/src/vec/exec/scan/vscanner.cpp | 7 +-
be/src/vec/exec/scan/vscanner.h | 1 +
.../java/org/apache/doris/qe/SessionVariable.java | 7 +
gensrc/thrift/PaloInternalService.thrift | 2 +
.../data/correctness/test_pushdown_common_expr.out | 49 +++
.../correctness/test_pushdown_common_expr.groovy | 86 +++++
23 files changed, 554 insertions(+), 92 deletions(-)
diff --git a/be/src/olap/iterators.h b/be/src/olap/iterators.h
index 58fd8f3ef7..62422f1410 100644
--- a/be/src/olap/iterators.h
+++ b/be/src/olap/iterators.h
@@ -119,6 +119,7 @@ public:
std::vector<uint32_t>* read_orderby_key_columns = nullptr;
IOContext io_ctx;
vectorized::VExpr* remaining_vconjunct_root = nullptr;
+ vectorized::VExprContext* common_vexpr_ctxs_pushdown = nullptr;
const std::set<int32_t>* output_columns = nullptr;
// runtime state
RuntimeState* runtime_state = nullptr;
diff --git a/be/src/olap/olap_common.h b/be/src/olap/olap_common.h
index 2ae791969b..5dfbb96a8b 100644
--- a/be/src/olap/olap_common.h
+++ b/be/src/olap/olap_common.h
@@ -306,6 +306,7 @@ struct OlapReaderStatistics {
int64_t block_init_seek_num = 0;
int64_t block_init_seek_ns = 0;
int64_t first_read_ns = 0;
+ int64_t second_read_ns = 0;
int64_t block_first_read_seek_num = 0;
int64_t block_first_read_seek_ns = 0;
int64_t lazy_read_ns = 0;
@@ -320,6 +321,7 @@ struct OlapReaderStatistics {
int64_t rows_vec_del_cond_filtered = 0;
int64_t vec_cond_ns = 0;
int64_t short_cond_ns = 0;
+ int64_t expr_filter_ns = 0;
int64_t output_col_ns = 0;
int64_t rows_key_range_filtered = 0;
diff --git a/be/src/olap/reader.cpp b/be/src/olap/reader.cpp
index 7d03696bc7..dba7f222ac 100644
--- a/be/src/olap/reader.cpp
+++ b/be/src/olap/reader.cpp
@@ -225,6 +225,7 @@ Status TabletReader::_capture_rs_readers(const
ReaderParams& read_params) {
_reader_context.record_rowids = read_params.record_rowids;
_reader_context.is_key_column_group = read_params.is_key_column_group;
_reader_context.remaining_vconjunct_root =
read_params.remaining_vconjunct_root;
+ _reader_context.common_vexpr_ctxs_pushdown =
read_params.common_vexpr_ctxs_pushdown;
_reader_context.output_columns = &read_params.output_columns;
return Status::OK();
diff --git a/be/src/olap/reader.h b/be/src/olap/reader.h
index 41e095ade0..17f7e79286 100644
--- a/be/src/olap/reader.h
+++ b/be/src/olap/reader.h
@@ -113,6 +113,7 @@ public:
std::unordered_set<uint32_t>* tablet_columns_convert_to_null_set =
nullptr;
TPushAggOp::type push_down_agg_type_opt = TPushAggOp::NONE;
vectorized::VExpr* remaining_vconjunct_root = nullptr;
+ vectorized::VExprContext* common_vexpr_ctxs_pushdown = nullptr;
// used for compaction to record row ids
bool record_rowids = false;
diff --git a/be/src/olap/rowset/beta_rowset_reader.cpp
b/be/src/olap/rowset/beta_rowset_reader.cpp
index 2ed1891892..0f4410a45f 100644
--- a/be/src/olap/rowset/beta_rowset_reader.cpp
+++ b/be/src/olap/rowset/beta_rowset_reader.cpp
@@ -66,6 +66,7 @@ Status
BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context
_read_options.stats = _stats;
_read_options.push_down_agg_type_opt = _context->push_down_agg_type_opt;
_read_options.remaining_vconjunct_root =
_context->remaining_vconjunct_root;
+ _read_options.common_vexpr_ctxs_pushdown =
_context->common_vexpr_ctxs_pushdown;
_read_options.rowset_id = _rowset->rowset_id();
_read_options.version = _rowset->version();
_read_options.tablet_id = _rowset->rowset_meta()->tablet_id();
diff --git a/be/src/olap/rowset/rowset_reader_context.h
b/be/src/olap/rowset/rowset_reader_context.h
index 0fce2cde12..7671a75d43 100644
--- a/be/src/olap/rowset/rowset_reader_context.h
+++ b/be/src/olap/rowset/rowset_reader_context.h
@@ -64,6 +64,7 @@ struct RowsetReaderContext {
OlapReaderStatistics* stats = nullptr;
RuntimeState* runtime_state = nullptr;
vectorized::VExpr* remaining_vconjunct_root = nullptr;
+ vectorized::VExprContext* common_vexpr_ctxs_pushdown = nullptr;
bool use_page_cache = false;
int sequence_id_idx = -1;
int batch_size = 1024;
diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp
b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
index dfdb340027..7d73492be7 100644
--- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
@@ -32,9 +32,12 @@
#include "util/doris_metrics.h"
#include "util/key_util.h"
#include "util/simd/bits.h"
+#include "vec/columns/column.h"
+#include "vec/columns/column_const.h"
#include "vec/data_types/data_type_factory.hpp"
#include "vec/data_types/data_type_number.h"
#include "vec/exprs/vliteral.h"
+#include "vec/exprs/vslot_ref.h"
namespace doris {
using namespace ErrorCode;
@@ -153,7 +156,8 @@ SegmentIterator::SegmentIterator(std::shared_ptr<Segment>
segment, const Schema&
_cur_rowid(0),
_lazy_materialization_read(false),
_inited(false),
- _estimate_row_size(true) {}
+ _estimate_row_size(true),
+ _wait_times_estimate_row_size(10) {}
SegmentIterator::~SegmentIterator() {
for (auto iter : _column_iterators) {
@@ -183,6 +187,8 @@ Status SegmentIterator::init(const StorageReadOptions&
opts) {
}
_remaining_vconjunct_root = opts.remaining_vconjunct_root;
+ _common_vexpr_ctxs_pushdown = opts.common_vexpr_ctxs_pushdown;
+ _enable_common_expr_pushdown = _common_vexpr_ctxs_pushdown ? true : false;
_column_predicate_info.reset(new ColumnPredicateInfo());
_calculate_pred_in_remaining_vconjunct_root(_remaining_vconjunct_root);
@@ -465,6 +471,22 @@ bool SegmentIterator::_is_literal_node(const
TExprNodeType::type& node_type) {
}
}
+Status SegmentIterator::_extract_common_expr_columns(vectorized::VExpr* expr) {
+ auto children = expr->children();
+ for (int i = 0; i < children.size(); ++i) {
+ RETURN_IF_ERROR(_extract_common_expr_columns(children[i]));
+ }
+
+ auto node_type = expr->node_type();
+ if (node_type == TExprNodeType::SLOT_REF) {
+ auto slot_expr = dynamic_cast<doris::vectorized::VSlotRef*>(expr);
+ _is_common_expr_column[_schema.column_id(slot_expr->column_id())] =
true;
+ _common_expr_columns.insert(_schema.column_id(slot_expr->column_id()));
+ }
+
+ return Status::OK();
+}
+
Status
SegmentIterator::_execute_predicates_except_leafnode_of_andnode(vectorized::VExpr*
expr) {
if (expr == nullptr) {
return Status::OK();
@@ -1154,6 +1176,7 @@ void SegmentIterator::_vec_init_lazy_materialization() {
}
}
+ // Step1: extract columns that can be lazy materialization
if (!_col_predicates.empty() || !del_cond_id_set.empty()) {
std::set<ColumnId> short_cir_pred_col_id_set; // using set for
distinct cid
std::set<ColumnId> vec_pred_col_id_set;
@@ -1163,7 +1186,7 @@ void SegmentIterator::_vec_init_lazy_materialization() {
_is_pred_column[cid] = true;
pred_column_ids.insert(cid);
- // Step1: check pred using short eval or vec eval
+ // check pred using short eval or vec eval
if (_can_evaluated_by_vectorized(predicate)) {
vec_pred_col_id_set.insert(predicate->column_id());
_pre_eval_block_predicate.push_back(predicate);
@@ -1195,58 +1218,92 @@ void SegmentIterator::_vec_init_lazy_materialization() {
_is_need_short_eval = true;
}
- // Step 2: check non-predicate read costs to determine whether need lazy
materialization
- // fill _non_predicate_columns.
- // After some optimization, we suppose lazy materialization is better
performance.
+ // make _schema_block_id_map
+ _schema_block_id_map.resize(_schema.columns().size());
+ for (int i = 0; i < _schema.num_column_ids(); i++) {
+ auto cid = _schema.column_id(i);
+ _schema_block_id_map[cid] = i;
+ }
+
+ // Step2: extract columns that can execute expr context
+ _is_common_expr_column.resize(_schema.columns().size(), false);
+ if (_enable_common_expr_pushdown && _remaining_vconjunct_root != nullptr) {
+ _extract_common_expr_columns(_remaining_vconjunct_root);
+ if (!_common_expr_columns.empty()) {
+ _is_need_expr_eval = true;
+ for (auto cid : _schema.column_ids()) {
+ // pred column also needs to be filtered by expr
+ if (_is_common_expr_column[cid] || _is_pred_column[cid]) {
+ auto loc = _schema_block_id_map[cid];
+ _columns_to_filter.push_back(loc);
+ }
+ }
+ }
+ }
+
+ // Step 3: fill non predicate columns and second read column
+ // if _schema columns size equal to pred_column_ids size,
lazy_materialization_read is false,
+ // all columns are lazy materialization columns without non predicte
column.
+ // If common expr pushdown exists, and expr column is not contained in
lazy materialization columns,
+ // add to second read column, which will be read after lazy materialization
if (_schema.column_ids().size() > pred_column_ids.size()) {
for (auto cid : _schema.column_ids()) {
if (!_is_pred_column[cid]) {
- _non_predicate_columns.push_back(cid);
if (_is_need_vec_eval || _is_need_short_eval) {
_lazy_materialization_read = true;
}
+ if (!_is_common_expr_column[cid]) {
+ _non_predicate_columns.push_back(cid);
+ } else {
+ _second_read_column_ids.push_back(cid);
+ }
}
}
}
- // Step 3: fill column ids for read and output
+ // Step 4: fill first read columns
if (_lazy_materialization_read) {
// insert pred cid to first_read_columns
for (auto cid : pred_column_ids) {
_first_read_column_ids.push_back(cid);
}
- } else if (!_is_need_vec_eval &&
- !_is_need_short_eval) { // no pred exists, just read and output
column
+ } else if (!_is_need_vec_eval && !_is_need_short_eval &&
+ !_is_need_expr_eval) { // no pred exists, just read and output
column
for (int i = 0; i < _schema.num_column_ids(); i++) {
auto cid = _schema.column_id(i);
_first_read_column_ids.push_back(cid);
}
- } else { // pred exits, but we can eliminate lazy materialization
- // insert pred/non-pred cid to first read columns
- std::set<ColumnId> pred_id_set;
- pred_id_set.insert(_short_cir_pred_column_ids.begin(),
_short_cir_pred_column_ids.end());
- pred_id_set.insert(_vec_pred_column_ids.begin(),
_vec_pred_column_ids.end());
- std::set<ColumnId> non_pred_set(_non_predicate_columns.begin(),
- _non_predicate_columns.end());
-
- for (int i = 0; i < _schema.num_column_ids(); i++) {
- auto cid = _schema.column_id(i);
- if (pred_id_set.find(cid) != pred_id_set.end()) {
- _first_read_column_ids.push_back(cid);
- } else if (non_pred_set.find(cid) != non_pred_set.end()) {
+ } else {
+ if (_is_need_vec_eval || _is_need_short_eval) {
+ // TODO To refactor, because we suppose lazy materialization is
better performance.
+ // pred exits, but we can eliminate lazy materialization
+ // insert pred/non-pred cid to first read columns
+ std::set<ColumnId> pred_id_set;
+ pred_id_set.insert(_short_cir_pred_column_ids.begin(),
+ _short_cir_pred_column_ids.end());
+ pred_id_set.insert(_vec_pred_column_ids.begin(),
_vec_pred_column_ids.end());
+ std::set<ColumnId> non_pred_set(_non_predicate_columns.begin(),
+ _non_predicate_columns.end());
+
+ DCHECK(_second_read_column_ids.empty());
+ // _second_read_column_ids must be empty. Otherwise
_lazy_materialization_read must not false.
+ for (int i = 0; i < _schema.num_column_ids(); i++) {
+ auto cid = _schema.column_id(i);
+ if (pred_id_set.find(cid) != pred_id_set.end()) {
+ _first_read_column_ids.push_back(cid);
+ } else if (non_pred_set.find(cid) != non_pred_set.end()) {
+ _first_read_column_ids.push_back(cid);
+ // when _lazy_materialization_read = false, non-predicate
column should also be filtered by sel idx, so we regard it as pred columns
+ _is_pred_column[cid] = true;
+ }
+ }
+ } else if (_is_need_expr_eval) {
+ DCHECK(!_is_need_vec_eval && !_is_need_short_eval);
+ for (auto cid : _common_expr_columns) {
_first_read_column_ids.push_back(cid);
- // when _lazy_materialization_read = false, non-predicate
column should also be filtered by sel idx, so we regard it as pred columns
- _is_pred_column[cid] = true;
}
}
}
-
- // make _schema_block_id_map
- _schema_block_id_map.resize(_schema.columns().size());
- for (int i = 0; i < _schema.num_column_ids(); i++) {
- auto cid = _schema.column_id(i);
- _schema_block_id_map[cid] = i;
- }
}
bool SegmentIterator::_can_evaluated_by_vectorized(ColumnPredicate* predicate)
{
@@ -1281,6 +1338,9 @@ void SegmentIterator::_vec_init_char_column_id() {
do {
if (column_desc->type() == OLAP_FIELD_TYPE_CHAR) {
_char_type_idx.emplace_back(i);
+ if (i != 0) {
+ _char_type_idx_no_0.emplace_back(i);
+ }
break;
} else if (column_desc->type() != OLAP_FIELD_TYPE_ARRAY) {
break;
@@ -1355,7 +1415,7 @@ void
SegmentIterator::_output_non_pred_columns(vectorized::Block* block) {
SCOPED_RAW_TIMER(&_opts.stats->output_col_ns);
for (auto cid : _non_predicate_columns) {
auto loc = _schema_block_id_map[cid];
- // if loc < block->block->columns() means the column is delete column
and should
+ // if loc > block->columns() means the column is delete column and
should
// not output by block, so just skip the column.
if (loc < block->columns()) {
block->replace_by_position(loc,
std::move(_current_return_columns[cid]));
@@ -1536,7 +1596,7 @@ Status SegmentIterator::next_batch(vectorized::Block*
block) {
if (UNLIKELY(!_inited)) {
RETURN_IF_ERROR(_init());
_inited = true;
- if (_lazy_materialization_read || _opts.record_rowids) {
+ if (_lazy_materialization_read || _opts.record_rowids ||
_is_need_expr_eval) {
_block_rowids.resize(_opts.block_row_max);
}
_current_return_columns.resize(_schema.columns().size());
@@ -1567,14 +1627,21 @@ Status SegmentIterator::next_batch(vectorized::Block*
block) {
_current_batch_rows_read = 0;
uint32_t nrows_read_limit = _opts.block_row_max;
- if (UNLIKELY(_estimate_row_size)) {
- // read 100 rows to estimate average row size
+ if (_wait_times_estimate_row_size > 0) {
+ // first time, read 100 rows to estimate average row size, to avoid
oom caused by a single batch being too large.
+ // If no valid data is read for the first time, block_row_max is read
each time thereafter.
+ // Avoid low performance when valid data cannot be read all the time
nrows_read_limit = std::min(nrows_read_limit, (uint32_t)100);
+ _wait_times_estimate_row_size--;
}
_split_row_ranges.clear();
_split_row_ranges.reserve(nrows_read_limit / 2);
_read_columns_by_index(nrows_read_limit, _current_batch_rows_read,
- _lazy_materialization_read || _opts.record_rowids);
+ _lazy_materialization_read || _opts.record_rowids
|| _is_need_expr_eval);
+ if (std::find(_first_read_column_ids.begin(), _first_read_column_ids.end(),
+ _schema.version_col_idx()) != _first_read_column_ids.end()) {
+ _replace_version_col(_current_batch_rows_read);
+ }
_opts.stats->blocks_load += 1;
_opts.stats->raw_rows_read += _current_batch_rows_read;
@@ -1583,7 +1650,7 @@ Status SegmentIterator::next_batch(vectorized::Block*
block) {
for (int i = 0; i < block->columns(); i++) {
auto cid = _schema.column_id(i);
// todo(wb) abstract make column where
- if (!_is_pred_column[cid]) { // non-predicate
+ if (!_is_pred_column[cid]) {
block->replace_by_position(i,
std::move(_current_return_columns[cid]));
}
}
@@ -1591,23 +1658,109 @@ Status SegmentIterator::next_batch(vectorized::Block*
block) {
return Status::EndOfFile("no more data in segment");
}
- if (!_is_need_vec_eval && !_is_need_short_eval) {
- _replace_version_col(_current_batch_rows_read);
+ if (!_is_need_vec_eval && !_is_need_short_eval && !_is_need_expr_eval) {
_output_non_pred_columns(block);
_output_index_result_column(nullptr, 0, block);
} else {
- _convert_dict_code_for_predicate_if_necessary();
uint16_t selected_size = _current_batch_rows_read;
uint16_t sel_rowid_idx[selected_size];
- // step 1: evaluate vectorization predicate
- selected_size = _evaluate_vectorization_predicate(sel_rowid_idx,
selected_size);
+ if (_is_need_vec_eval || _is_need_short_eval) {
+ _convert_dict_code_for_predicate_if_necessary();
+
+ // step 1: evaluate vectorization predicate
+ selected_size = _evaluate_vectorization_predicate(sel_rowid_idx,
selected_size);
+
+ // step 2: evaluate short circuit predicate
+ // todo(wb) research whether need to read short predicate after
vectorization evaluation
+ // to reduce cost of read short circuit columns.
+ // In SSB test, it make no difference; So need more
scenarios to test
+ selected_size = _evaluate_short_circuit_predicate(sel_rowid_idx,
selected_size);
- // step 2: evaluate short circuit predicate
- // todo(wb) research whether need to read short predicate after
vectorization evaluation
- // to reduce cost of read short circuit columns.
- // In SSB test, it make no difference; So need more scenarios
to test
- selected_size = _evaluate_short_circuit_predicate(sel_rowid_idx,
selected_size);
+ if (selected_size > 0) {
+ // step 3.1: output short circuit and predicate column
+ // when lazy materialization enables, _first_read_column_ids =
distinct(_short_cir_pred_column_ids + _vec_pred_column_ids)
+ // see _vec_init_lazy_materialization
+ // todo(wb) need to tell input columnids from output columnids
+ RETURN_IF_ERROR(_output_column_by_sel_idx(block,
_first_read_column_ids,
+ sel_rowid_idx,
selected_size));
+
+ // step 3.2: read remaining expr column and evaluate it.
+ if (_is_need_expr_eval) {
+ // The predicate column contains the remaining expr
column, no need second read.
+ if (!_second_read_column_ids.empty()) {
+ SCOPED_RAW_TIMER(&_opts.stats->second_read_ns);
+ RETURN_IF_ERROR(_read_columns_by_rowids(
+ _second_read_column_ids, _block_rowids,
sel_rowid_idx,
+ selected_size, &_current_return_columns));
+ if (std::find(_second_read_column_ids.begin(),
+ _second_read_column_ids.end(),
+ _schema.version_col_idx()) !=
_second_read_column_ids.end()) {
+ _replace_version_col(selected_size);
+ }
+ for (auto cid : _second_read_column_ids) {
+ auto loc = _schema_block_id_map[cid];
+ block->replace_by_position(loc,
+
std::move(_current_return_columns[cid]));
+ }
+ }
+
+ DCHECK(block->columns() >
_schema_block_id_map[*_common_expr_columns.begin()]);
+ // block->rows() takes the size of the first column by
default. If the first column is no predicate column,
+ // it has not been read yet. add a const column that has
been read to calculate rows().
+ if (block->rows() == 0) {
+ vectorized::MutableColumnPtr col0 =
+
std::move(*block->get_by_position(0).column).mutate();
+ auto res_column = vectorized::ColumnString::create();
+ res_column->insert_data("", 0);
+ auto col_const =
vectorized::ColumnConst::create(std::move(res_column),
+
selected_size);
+ block->replace_by_position(0, std::move(col_const));
+ _output_index_result_column(sel_rowid_idx,
selected_size, block);
+
block->shrink_char_type_column_suffix_zero(_char_type_idx_no_0);
+ RETURN_IF_ERROR(_execute_common_expr(sel_rowid_idx,
selected_size, block));
+ block->replace_by_position(0, std::move(col0));
+ } else {
+ _output_index_result_column(sel_rowid_idx,
selected_size, block);
+
block->shrink_char_type_column_suffix_zero(_char_type_idx);
+ RETURN_IF_ERROR(_execute_common_expr(sel_rowid_idx,
selected_size, block));
+ }
+ }
+ } else if (_is_need_expr_eval) {
+ for (auto cid : _second_read_column_ids) {
+ auto loc = _schema_block_id_map[cid];
+ block->replace_by_position(loc,
std::move(_current_return_columns[cid]));
+ }
+ }
+ } else if (_is_need_expr_eval) {
+ DCHECK(!_first_read_column_ids.empty());
+ // first read all rows are insert block, initialize sel_rowid_idx
to all rows.
+ for (auto cid : _first_read_column_ids) {
+ auto loc = _schema_block_id_map[cid];
+ block->replace_by_position(loc,
std::move(_current_return_columns[cid]));
+ }
+ for (uint32_t i = 0; i < selected_size; ++i) {
+ sel_rowid_idx[i] = i;
+ }
+
+ if (block->rows() == 0) {
+ vectorized::MutableColumnPtr col0 =
+ std::move(*block->get_by_position(0).column).mutate();
+ auto res_column = vectorized::ColumnString::create();
+ res_column->insert_data("", 0);
+ auto col_const =
+ vectorized::ColumnConst::create(std::move(res_column),
selected_size);
+ block->replace_by_position(0, std::move(col_const));
+ _output_index_result_column(nullptr, 0, block);
+
block->shrink_char_type_column_suffix_zero(_char_type_idx_no_0);
+ RETURN_IF_ERROR(_execute_common_expr(sel_rowid_idx,
selected_size, block));
+ block->replace_by_position(0, std::move(col0));
+ } else {
+ _output_index_result_column(nullptr, 0, block);
+ block->shrink_char_type_column_suffix_zero(_char_type_idx);
+ RETURN_IF_ERROR(_execute_common_expr(sel_rowid_idx,
selected_size, block));
+ }
+ }
if (UNLIKELY(_opts.record_rowids)) {
_sel_rowid_idx.resize(selected_size);
@@ -1617,45 +1770,33 @@ Status SegmentIterator::next_batch(vectorized::Block*
block) {
}
}
- if (!_lazy_materialization_read) {
- Status ret = Status::OK();
- if (selected_size > 0) {
- _replace_version_col(selected_size);
- ret = _output_column_by_sel_idx(block, _first_read_column_ids,
sel_rowid_idx,
- selected_size);
- }
- if (!ret.ok()) {
- return ret;
- }
+ if (_non_predicate_columns.empty()) {
// shrink char_type suffix zero data
block->shrink_char_type_column_suffix_zero(_char_type_idx);
if (UNLIKELY(_estimate_row_size) && block->rows() > 0) {
_update_max_row(block);
}
- return ret;
+ return Status::OK();
}
- // step3: read non_predicate column
- RETURN_IF_ERROR(_read_columns_by_rowids(_non_predicate_columns,
_block_rowids,
- sel_rowid_idx, selected_size,
- &_current_return_columns));
-
- _replace_version_col(selected_size);
+ // step4: read non_predicate column
+ if (selected_size > 0) {
+ RETURN_IF_ERROR(_read_columns_by_rowids(_non_predicate_columns,
_block_rowids,
+ sel_rowid_idx,
selected_size,
+ &_current_return_columns));
+ if (std::find(_non_predicate_columns.begin(),
_non_predicate_columns.end(),
+ _schema.version_col_idx()) !=
_non_predicate_columns.end()) {
+ _replace_version_col(selected_size);
+ }
+ }
- // step4: output columns
- // 4.1 output non-predicate column
+ // step5: output columns
_output_non_pred_columns(block);
- // 4.3 output short circuit and predicate column
- // when lazy materialization enables, _first_read_column_ids =
distinct(_short_cir_pred_column_ids + _vec_pred_column_ids)
- // see _vec_init_lazy_materialization
- // todo(wb) need to tell input columnids from output columnids
- if (selected_size > 0) {
- RETURN_IF_ERROR(_output_column_by_sel_idx(block,
_first_read_column_ids, sel_rowid_idx,
- selected_size));
+ if (!_is_need_expr_eval) {
+ _output_index_result_column(sel_rowid_idx, selected_size, block);
}
- _output_index_result_column(sel_rowid_idx, selected_size, block);
}
// shrink char_type suffix zero data
@@ -1680,6 +1821,106 @@ Status SegmentIterator::next_batch(vectorized::Block*
block) {
return Status::OK();
}
+Status SegmentIterator::_execute_common_expr(uint16_t* sel_rowid_idx,
uint16_t& selected_size,
+ vectorized::Block* block) {
+ SCOPED_RAW_TIMER(&_opts.stats->expr_filter_ns);
+ DCHECK(_remaining_vconjunct_root != nullptr);
+ DCHECK(block->rows() != 0);
+ size_t prev_columns = block->columns();
+ Defer defer {[&]() { vectorized::Block::erase_useless_column(block,
prev_columns); }};
+
+ int result_column_id = -1;
+ RETURN_IF_ERROR(_common_vexpr_ctxs_pushdown->execute(block,
&result_column_id));
+ vectorized::ColumnPtr filter_column =
block->get_by_position(result_column_id).column;
+ if (auto* nullable_column =
+
vectorized::check_and_get_column<vectorized::ColumnNullable>(*filter_column)) {
+ vectorized::ColumnPtr nested_column =
nullable_column->get_nested_column_ptr();
+
+ vectorized::MutableColumnPtr mutable_holder =
+ nested_column->use_count() == 1
+ ? nested_column->assume_mutable()
+ : nested_column->clone_resized(nested_column->size());
+
+ vectorized::ColumnUInt8* concrete_column =
+ typeid_cast<vectorized::ColumnUInt8*>(mutable_holder.get());
+ if (!concrete_column) {
+ return Status::InvalidArgument(
+ "Illegal type {} of column for filter. Must be UInt8 or
Nullable(UInt8).",
+ filter_column->get_name());
+ }
+ auto* __restrict null_map =
nullable_column->get_null_map_data().data();
+ vectorized::IColumn::Filter& filter = concrete_column->get_data();
+ auto* __restrict filter_data = filter.data();
+
+ const size_t size = filter.size();
+ for (size_t i = 0; i < size; ++i) {
+ filter_data[i] &= !null_map[i];
+ }
+
+ selected_size = _evaluate_common_expr_filter(sel_rowid_idx,
selected_size, filter);
+ vectorized::Block::filter_block_internal(block, _columns_to_filter,
filter);
+ } else if (auto* const_column =
+
vectorized::check_and_get_column<vectorized::ColumnConst>(*filter_column)) {
+ bool ret = const_column->get_bool(0);
+ if (!ret) {
+ for (auto& col : _columns_to_filter) {
+
std::move(*block->get_by_position(col).column).assume_mutable()->clear();
+ }
+ selected_size = 0;
+ }
+ } else {
+ const vectorized::IColumn::Filter& filter =
+ assert_cast<const
doris::vectorized::ColumnVector<vectorized::UInt8>&>(
+ *filter_column)
+ .get_data();
+ selected_size = _evaluate_common_expr_filter(sel_rowid_idx,
selected_size, filter);
+ vectorized::Block::filter_block_internal(block, _columns_to_filter,
filter);
+ }
+ return Status::OK();
+}
+
+uint16_t SegmentIterator::_evaluate_common_expr_filter(uint16_t* sel_rowid_idx,
+ uint16_t selected_size,
+ const
vectorized::IColumn::Filter& filter) {
+ size_t count = filter.size() -
simd::count_zero_num((int8_t*)filter.data(), filter.size());
+ if (count == 0) {
+ return 0;
+ } else {
+ const vectorized::UInt8* filt_pos = filter.data();
+
+ uint16_t new_size = 0;
+ uint32_t sel_pos = 0;
+ const uint32_t sel_end = selected_size;
+ static constexpr size_t SIMD_BYTES = 32;
+ const uint32_t sel_end_simd = sel_pos + selected_size / SIMD_BYTES *
SIMD_BYTES;
+
+ while (sel_pos < sel_end_simd) {
+ auto mask = simd::bytes32_mask_to_bits32_mask(filt_pos + sel_pos);
+ if (0 == mask) {
+ //pass
+ } else if (0xffffffff == mask) {
+ for (uint32_t i = 0; i < SIMD_BYTES; i++) {
+ sel_rowid_idx[new_size++] = sel_rowid_idx[sel_pos + i];
+ }
+ } else {
+ while (mask) {
+ const size_t bit_pos = __builtin_ctzll(mask);
+ sel_rowid_idx[new_size++] = sel_rowid_idx[sel_pos +
bit_pos];
+ mask = mask & (mask - 1);
+ }
+ }
+ sel_pos += SIMD_BYTES;
+ }
+
+ for (; sel_pos < sel_end; sel_pos++) {
+ if (filt_pos[sel_pos]) {
+ sel_rowid_idx[new_size++] = sel_rowid_idx[sel_pos];
+ }
+ }
+ return new_size;
+ }
+}
+
void SegmentIterator::_output_index_result_column(uint16_t* sel_rowid_idx,
uint16_t select_size,
vectorized::Block* block) {
SCOPED_RAW_TIMER(&_opts.stats->output_index_result_column_timer);
@@ -1776,7 +2017,7 @@ Status
SegmentIterator::current_block_row_locations(std::vector<RowLocation>* bl
DCHECK(_opts.record_rowids);
DCHECK_GE(_block_rowids.size(), _current_batch_rows_read);
uint32_t sid = segment_id();
- if (!_is_need_vec_eval && !_is_need_short_eval) {
+ if (!_is_need_vec_eval && !_is_need_short_eval && !_is_need_expr_eval) {
block_row_locations->resize(_current_batch_rows_read);
for (auto i = 0; i < _current_batch_rows_read; i++) {
(*block_row_locations)[i] = RowLocation(sid, _block_rowids[i]);
diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.h
b/be/src/olap/rowset/segment_v2/segment_iterator.h
index 9eba9637c8..17af761071 100644
--- a/be/src/olap/rowset/segment_v2/segment_iterator.h
+++ b/be/src/olap/rowset/segment_v2/segment_iterator.h
@@ -170,7 +170,6 @@ private:
Status _execute_compound_fn(const std::string& function_name);
bool _is_literal_node(const TExprNodeType::type& node_type);
- void _init_lazy_materialization();
void _vec_init_lazy_materialization();
// TODO: Fix Me
// CHAR type in storage layer padding the 0 in length. But query engine
need ignore the padding 0.
@@ -212,6 +211,12 @@ private:
bool _can_evaluated_by_vectorized(ColumnPredicate* predicate);
+ Status _extract_common_expr_columns(vectorized::VExpr* expr);
+ Status _execute_common_expr(uint16_t* sel_rowid_idx, uint16_t&
selected_size,
+ vectorized::Block* block);
+ uint16_t _evaluate_common_expr_filter(uint16_t* sel_rowid_idx, uint16_t
selected_size,
+ const vectorized::IColumn::Filter&
filter);
+
// Dictionary column should do something to initial.
void _convert_dict_code_for_predicate_if_necessary();
@@ -317,15 +322,15 @@ private:
// --------------------------------------------
// whether lazy materialization read should be used.
bool _lazy_materialization_read;
- // columns to read before predicate evaluation
- std::vector<ColumnId> _predicate_columns;
- // columns to read after predicate evaluation
+ // columns to read after predicate evaluation and remaining expr execute
std::vector<ColumnId> _non_predicate_columns;
+ std::set<ColumnId> _common_expr_columns;
// remember the rowids we've read for the current row block.
// could be a local variable of next_batch(), kept here to reuse vector
memory
std::vector<rowid_t> _block_rowids;
bool _is_need_vec_eval = false;
bool _is_need_short_eval = false;
+ bool _is_need_expr_eval = false;
// fields for vectorization execution
std::vector<ColumnId>
@@ -334,6 +339,7 @@ private:
_short_cir_pred_column_ids; // keep columnId of columns for short
circuit predicate evaluation
std::vector<bool> _is_pred_column; // columns hold by segmentIter
std::map<uint32_t, bool> _need_read_data_indices;
+ std::vector<bool> _is_common_expr_column;
vectorized::MutableColumns _current_return_columns;
std::vector<ColumnPredicate*> _pre_eval_block_predicate;
std::vector<ColumnPredicate*> _short_cir_eval_predicate;
@@ -344,16 +350,22 @@ private:
// second, read non-predicate columns
// so we need a field to stand for columns first time to read
std::vector<ColumnId> _first_read_column_ids;
+ std::vector<ColumnId> _second_read_column_ids;
+ std::vector<ColumnId> _columns_to_filter;
std::vector<int> _schema_block_id_map; // map from schema column id to
column idx in Block
// the actual init process is delayed to the first call to next_batch()
bool _inited;
bool _estimate_row_size;
+ // Read up to 100 rows at a time while waiting for the estimated row size.
+ int _wait_times_estimate_row_size;
StorageReadOptions _opts;
// make a copy of `_opts.column_predicates` in order to make local changes
std::vector<ColumnPredicate*> _col_predicates;
std::vector<ColumnPredicate*> _col_preds_except_leafnode_of_andnode;
+ doris::vectorized::VExprContext* _common_vexpr_ctxs_pushdown;
+ bool _enable_common_expr_pushdown = false;
doris::vectorized::VExpr* _remaining_vconjunct_root;
std::vector<roaring::Roaring>
_pred_except_leafnode_of_andnode_evaluate_result;
std::unique_ptr<ColumnPredicateInfo> _column_predicate_info;
@@ -378,6 +390,7 @@ private:
// char_type or array<char> type columns cid
std::vector<size_t> _char_type_idx;
+ std::vector<size_t> _char_type_idx_no_0;
// number of rows read in the current batch
uint32_t _current_batch_rows_read = 0;
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 029c74d27d..d309390d6a 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -118,6 +118,11 @@ public:
_query_options.check_overflow_for_decimal;
}
+ bool enable_common_expr_pushdown() const {
+ return _query_options.__isset.enable_common_expr_pushdown &&
+ _query_options.enable_common_expr_pushdown;
+ }
+
Status query_status() {
std::lock_guard<std::mutex> l(_process_status_lock);
return _process_status;
diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp
index cc09d53cd2..804ef8c9db 100644
--- a/be/src/vec/core/block.cpp
+++ b/be/src/vec/core/block.cpp
@@ -321,8 +321,8 @@ void Block::check_number_of_rows(bool allow_null_columns)
const {
if (rows == -1) {
rows = size;
} else if (rows != size) {
- LOG(FATAL) << fmt::format("Sizes of columns doesn't match:
{}:{},{}:{}",
- data.front().name, rows, elem.name,
size);
+ LOG(FATAL) << fmt::format("Sizes of columns doesn't match:
{}:{},{}:{}, col size: {}",
+ data.front().name, rows, elem.name,
size, each_col_size());
}
}
}
@@ -337,7 +337,7 @@ size_t Block::rows() const {
return 0;
}
-std::string Block::each_col_size() {
+std::string Block::each_col_size() const {
std::stringstream ss;
for (const auto& elem : data) {
if (elem.column) {
@@ -438,6 +438,11 @@ std::string Block::dump_data(size_t begin, size_t
row_limit) const {
// content
for (size_t row_num = begin; row_num < rows() && row_num < row_limit +
begin; ++row_num) {
for (size_t i = 0; i < columns(); ++i) {
+ if (data[i].column->empty()) {
+ out << std::setfill(' ') << std::setw(1) << "|" <<
std::setw(headers_size[i])
+ << std::right;
+ continue;
+ }
std::string s = "";
if (data[i].column) {
s = data[i].to_string(row_num);
@@ -959,6 +964,11 @@ std::string MutableBlock::dump_data(size_t row_limit)
const {
// content
for (size_t row_num = 0; row_num < rows() && row_num < row_limit;
++row_num) {
for (size_t i = 0; i < columns(); ++i) {
+ if (_columns[i].get()->empty()) {
+ out << std::setfill(' ') << std::setw(1) << "|" <<
std::setw(headers_size[i])
+ << std::right;
+ continue;
+ }
std::string s = _data_types[i]->to_string(*_columns[i].get(),
row_num);
if (s.length() > headers_size[i]) {
s = s.substr(0, headers_size[i] - 3) + "...";
diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h
index ba1809f19a..bcaf6fa7d2 100644
--- a/be/src/vec/core/block.h
+++ b/be/src/vec/core/block.h
@@ -200,7 +200,7 @@ public:
/// Returns number of rows from first column in block, not equal to
nullptr. If no columns, returns 0.
size_t rows() const;
- std::string each_col_size();
+ std::string each_col_size() const;
// Cut the rows in block, use in LIMIT operation
void set_num_rows(size_t length);
diff --git a/be/src/vec/exec/scan/new_olap_scan_node.cpp
b/be/src/vec/exec/scan/new_olap_scan_node.cpp
index 0ea05409cb..8761ca380c 100644
--- a/be/src/vec/exec/scan/new_olap_scan_node.cpp
+++ b/be/src/vec/exec/scan/new_olap_scan_node.cpp
@@ -80,7 +80,9 @@ Status NewOlapScanNode::_init_profile() {
_rows_vec_cond_counter = ADD_COUNTER(_segment_profile,
"RowsVectorPredFiltered", TUnit::UNIT);
_vec_cond_timer = ADD_TIMER(_segment_profile, "VectorPredEvalTime");
_short_cond_timer = ADD_TIMER(_segment_profile, "ShortPredEvalTime");
+ _expr_filter_timer = ADD_TIMER(_segment_profile, "ExprFilterEvalTime");
_first_read_timer = ADD_TIMER(_segment_profile, "FirstReadTime");
+ _second_read_timer = ADD_TIMER(_segment_profile, "SecondReadTime");
_first_read_seek_timer = ADD_TIMER(_segment_profile, "FirstReadSeekTime");
_first_read_seek_counter = ADD_COUNTER(_segment_profile,
"FirstReadSeekCount", TUnit::UNIT);
@@ -337,6 +339,14 @@ Status
NewOlapScanNode::_should_push_down_function_filter(VectorizedFnCall* fn_c
return Status::OK();
}
+bool NewOlapScanNode::_should_push_down_common_expr() {
+ return _state->enable_common_expr_pushdown() &&
+ (_olap_scan_node.keyType == TKeysType::DUP_KEYS ||
+ (_olap_scan_node.keyType == TKeysType::UNIQUE_KEYS &&
+ _olap_scan_node.__isset.enable_unique_key_merge_on_write &&
+ _olap_scan_node.enable_unique_key_merge_on_write));
+}
+
// PlanFragmentExecutor will call this method to set scan range
// Doris scan range is defined in thrift file like this
// struct TPaloScanRange {
@@ -437,9 +447,9 @@ Status
NewOlapScanNode::_init_scanners(std::list<VScanner*>* scanners) {
// add scanner to pool before doing prepare.
// so that scanner can be automatically deconstructed if prepare
failed.
_scanner_pool.add(scanner);
- RETURN_IF_ERROR(scanner->prepare(*scan_range, scanner_ranges,
_vconjunct_ctx_ptr.get(),
- _olap_filters, _filter_predicates,
- _push_down_functions));
+ RETURN_IF_ERROR(scanner->prepare(
+ *scan_range, scanner_ranges, _vconjunct_ctx_ptr.get(),
_olap_filters,
+ _filter_predicates, _push_down_functions,
_common_vexpr_ctxs_pushdown.get()));
scanners->push_back((VScanner*)scanner);
disk_set.insert(scanner->scan_disk());
}
diff --git a/be/src/vec/exec/scan/new_olap_scan_node.h
b/be/src/vec/exec/scan/new_olap_scan_node.h
index 3043fe60f7..1363b88947 100644
--- a/be/src/vec/exec/scan/new_olap_scan_node.h
+++ b/be/src/vec/exec/scan/new_olap_scan_node.h
@@ -55,6 +55,8 @@ protected:
PushDownType _should_push_down_is_null_predicate() override { return
PushDownType::ACCEPTABLE; }
+ bool _should_push_down_common_expr() override;
+
Status _init_scanners(std::list<VScanner*>* scanners) override;
private:
@@ -91,6 +93,7 @@ private:
RuntimeProfile::Counter* _rows_vec_cond_counter = nullptr;
RuntimeProfile::Counter* _vec_cond_timer = nullptr;
RuntimeProfile::Counter* _short_cond_timer = nullptr;
+ RuntimeProfile::Counter* _expr_filter_timer = nullptr;
RuntimeProfile::Counter* _output_col_timer = nullptr;
RuntimeProfile::Counter* _stats_filtered_counter = nullptr;
@@ -109,6 +112,7 @@ private:
RuntimeProfile::Counter* _block_init_seek_counter = nullptr;
RuntimeProfile::Counter* _block_conditions_filtered_timer = nullptr;
RuntimeProfile::Counter* _first_read_timer = nullptr;
+ RuntimeProfile::Counter* _second_read_timer = nullptr;
RuntimeProfile::Counter* _first_read_seek_timer = nullptr;
RuntimeProfile::Counter* _first_read_seek_counter = nullptr;
RuntimeProfile::Counter* _lazy_read_timer = nullptr;
diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp
b/be/src/vec/exec/scan/new_olap_scanner.cpp
index 1eea0afaa8..9a503bd16d 100644
--- a/be/src/vec/exec/scan/new_olap_scanner.cpp
+++ b/be/src/vec/exec/scan/new_olap_scanner.cpp
@@ -51,8 +51,13 @@ Status NewOlapScanner::prepare(const TPaloScanRange&
scan_range,
VExprContext** vconjunct_ctx_ptr,
const std::vector<TCondition>& filters,
const FilterPredicates& filter_predicates,
- const std::vector<FunctionFilter>&
function_filters) {
+ const std::vector<FunctionFilter>&
function_filters,
+ VExprContext** common_vexpr_ctxs_pushdown) {
RETURN_IF_ERROR(VScanner::prepare(_state, vconjunct_ctx_ptr));
+ if (common_vexpr_ctxs_pushdown != nullptr) {
+ // Copy common_vexpr_ctxs_pushdown from scan node to this scanner's
_common_vexpr_ctxs_pushdown, just necessary.
+ RETURN_IF_ERROR((*common_vexpr_ctxs_pushdown)->clone(_state,
&_common_vexpr_ctxs_pushdown));
+ }
// set limit to reduce end of rowset and segment mem use
_tablet_reader = std::make_unique<BlockReader>();
@@ -209,8 +214,14 @@ Status NewOlapScanner::_init_tablet_reader_params(
real_parent->_olap_scan_node.push_down_agg_type_opt;
}
_tablet_reader_params.version = Version(0, _version);
+ // TODO: If a new runtime filter arrives after `_vconjunct_ctx` move to
`_common_vexpr_ctxs_pushdown`,
+ // `_vconjunct_ctx` and `_common_vexpr_ctxs_pushdown` will have values at
the same time,
+ // and the root() of `_vconjunct_ctx` and `_common_vexpr_ctxs_pushdown`
should be merged as `remaining_vconjunct_root`
_tablet_reader_params.remaining_vconjunct_root =
- (_vconjunct_ctx == nullptr) ? nullptr : _vconjunct_ctx->root();
+ (_common_vexpr_ctxs_pushdown == nullptr)
+ ? (_vconjunct_ctx == nullptr ? nullptr :
_vconjunct_ctx->root())
+ : _common_vexpr_ctxs_pushdown->root();
+ _tablet_reader_params.common_vexpr_ctxs_pushdown =
_common_vexpr_ctxs_pushdown;
_tablet_reader_params.output_columns =
((NewOlapScanNode*)_parent)->_maybe_read_column_ids;
// Condition
@@ -471,12 +482,14 @@ void NewOlapScanner::_update_counters_before_close() {
_raw_rows_read += _tablet_reader->mutable_stats()->raw_rows_read;
COUNTER_UPDATE(olap_parent->_vec_cond_timer, stats.vec_cond_ns);
COUNTER_UPDATE(olap_parent->_short_cond_timer, stats.short_cond_ns);
+ COUNTER_UPDATE(olap_parent->_expr_filter_timer, stats.expr_filter_ns);
COUNTER_UPDATE(olap_parent->_block_init_timer, stats.block_init_ns);
COUNTER_UPDATE(olap_parent->_block_init_seek_timer,
stats.block_init_seek_ns);
COUNTER_UPDATE(olap_parent->_block_init_seek_counter,
stats.block_init_seek_num);
COUNTER_UPDATE(olap_parent->_block_conditions_filtered_timer,
stats.block_conditions_filtered_ns);
COUNTER_UPDATE(olap_parent->_first_read_timer, stats.first_read_ns);
+ COUNTER_UPDATE(olap_parent->_second_read_timer, stats.second_read_ns);
COUNTER_UPDATE(olap_parent->_first_read_seek_timer,
stats.block_first_read_seek_ns);
COUNTER_UPDATE(olap_parent->_first_read_seek_counter,
stats.block_first_read_seek_num);
COUNTER_UPDATE(olap_parent->_lazy_read_timer, stats.lazy_read_ns);
diff --git a/be/src/vec/exec/scan/new_olap_scanner.h
b/be/src/vec/exec/scan/new_olap_scanner.h
index 1b3bfcb364..83968e2535 100644
--- a/be/src/vec/exec/scan/new_olap_scanner.h
+++ b/be/src/vec/exec/scan/new_olap_scanner.h
@@ -44,7 +44,8 @@ public:
Status prepare(const TPaloScanRange& scan_range, const
std::vector<OlapScanRange*>& key_ranges,
VExprContext** vconjunct_ctx_ptr, const
std::vector<TCondition>& filters,
const FilterPredicates& filter_predicates,
- const std::vector<FunctionFilter>& function_filters);
+ const std::vector<FunctionFilter>& function_filters,
+ VExprContext** common_vexpr_ctxs_pushdown);
const std::string& scan_disk() const { return _tablet->data_dir()->path();
}
diff --git a/be/src/vec/exec/scan/vscan_node.cpp
b/be/src/vec/exec/scan/vscan_node.cpp
index 6093c6314d..e7b46ca659 100644
--- a/be/src/vec/exec/scan/vscan_node.cpp
+++ b/be/src/vec/exec/scan/vscan_node.cpp
@@ -388,6 +388,9 @@ void VScanNode::release_resource(RuntimeState* state) {
for (auto& ctx : _stale_vexpr_ctxs) {
(*ctx)->close(state);
}
+ if (_common_vexpr_ctxs_pushdown) {
+ (*_common_vexpr_ctxs_pushdown)->close(state);
+ }
_scanner_pool.clear();
ExecNode::release_resource(state);
@@ -457,7 +460,11 @@ Status VScanNode::_normalize_conjuncts() {
RETURN_IF_ERROR(_normalize_predicate((*_vconjunct_ctx_ptr)->root(), &new_root));
if (new_root) {
(*_vconjunct_ctx_ptr)->set_root(new_root);
- } else {
+ if (_should_push_down_common_expr()) {
+ _common_vexpr_ctxs_pushdown =
std::move(_vconjunct_ctx_ptr);
+ _vconjunct_ctx_ptr.reset(nullptr);
+ }
+ } else { // All conjucts are pushed down as predicate column
_stale_vexpr_ctxs.push_back(std::move(_vconjunct_ctx_ptr));
_vconjunct_ctx_ptr.reset(nullptr);
}
diff --git a/be/src/vec/exec/scan/vscan_node.h
b/be/src/vec/exec/scan/vscan_node.h
index 2d6e287af0..23b1e59b51 100644
--- a/be/src/vec/exec/scan/vscan_node.h
+++ b/be/src/vec/exec/scan/vscan_node.h
@@ -164,6 +164,8 @@ protected:
return Status::OK();
}
+ virtual bool _should_push_down_common_expr() { return false; }
+
virtual PushDownType _should_push_down_bloom_filter() { return
PushDownType::UNACCEPTABLE; }
virtual PushDownType _should_push_down_bitmap_filter() { return
PushDownType::UNACCEPTABLE; }
@@ -259,6 +261,7 @@ protected:
// Every time vconjunct_ctx_ptr is updated, the old ctx will be stored in
this vector
// so that it will be destroyed uniformly at the end of the query.
std::vector<std::unique_ptr<VExprContext*>> _stale_vexpr_ctxs;
+ std::unique_ptr<VExprContext*> _common_vexpr_ctxs_pushdown = nullptr;
// If sort info is set, push limit to each scanner;
int64_t _limit_per_scanner = -1;
diff --git a/be/src/vec/exec/scan/vscanner.cpp
b/be/src/vec/exec/scan/vscanner.cpp
index 68dbbbfe9e..180b573614 100644
--- a/be/src/vec/exec/scan/vscanner.cpp
+++ b/be/src/vec/exec/scan/vscanner.cpp
@@ -78,9 +78,9 @@ Status VScanner::get_block(RuntimeState* state, Block* block,
bool* eof) {
{
SCOPED_TIMER(_parent->_filter_timer);
RETURN_IF_ERROR(_filter_output_block(block));
- // record rows return (after filter) for _limit check
- _num_rows_return += block->rows();
}
+ // record rows return (after filter) for _limit check
+ _num_rows_return += block->rows();
} while (!state->is_cancelled() && block->rows() == 0 && !(*eof) &&
_num_rows_read < rows_read_threshold);
}
@@ -141,6 +141,9 @@ Status VScanner::close(RuntimeState* state) {
if (_vconjunct_ctx) {
_vconjunct_ctx->close(state);
}
+ if (_common_vexpr_ctxs_pushdown) {
+ _common_vexpr_ctxs_pushdown->close(state);
+ }
COUNTER_UPDATE(_parent->_scanner_wait_worker_timer,
_scanner_wait_worker_timer);
_is_closed = true;
diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h
index e36968e6f9..88cac3db42 100644
--- a/be/src/vec/exec/scan/vscanner.h
+++ b/be/src/vec/exec/scan/vscanner.h
@@ -153,6 +153,7 @@ protected:
// Cloned from _vconjunct_ctx of scan node.
// It includes predicate in SQL and runtime filters.
VExprContext* _vconjunct_ctx = nullptr;
+ VExprContext* _common_vexpr_ctxs_pushdown = nullptr;
// Late arriving runtime filters will update _vconjunct_ctx.
// The old _vconjunct_ctx will be temporarily placed in _stale_vexpr_ctxs
// and will be destroyed at the end.
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 4d7cb98994..1133cf6807 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -229,6 +229,8 @@ public class SessionVariable implements Serializable,
Writable {
public static final String ENABLE_FUNCTION_PUSHDOWN =
"enable_function_pushdown";
+ public static final String ENABLE_COMMON_EXPR_PUSHDOWN =
"enable_common_expr_pushdown";
+
public static final String FRAGMENT_TRANSMISSION_COMPRESSION_CODEC =
"fragment_transmission_compression_codec";
public static final String ENABLE_LOCAL_EXCHANGE = "enable_local_exchange";
@@ -643,6 +645,9 @@ public class SessionVariable implements Serializable,
Writable {
@VariableMgr.VarAttr(name = ENABLE_FUNCTION_PUSHDOWN)
public boolean enableFunctionPushdown = true;
+ @VariableMgr.VarAttr(name = ENABLE_COMMON_EXPR_PUSHDOWN, fuzzy = true)
+ public boolean enableCommonExprPushdown = true;
+
@VariableMgr.VarAttr(name = ENABLE_LOCAL_EXCHANGE, fuzzy = true)
public boolean enableLocalExchange = true;
@@ -765,6 +770,7 @@ public class SessionVariable implements Serializable,
Writable {
public void initFuzzyModeVariables() {
Random random = new Random(System.currentTimeMillis());
this.parallelExecInstanceNum = random.nextInt(8) + 1;
+ this.enableCommonExprPushdown = random.nextBoolean();
this.enableLocalExchange = random.nextBoolean();
// This will cause be dead loop, disable it first
// this.disableJoinReorder = random.nextBoolean();
@@ -1609,6 +1615,7 @@ public class SessionVariable implements Serializable,
Writable {
}
tResult.setEnableFunctionPushdown(enableFunctionPushdown);
+ tResult.setEnableCommonExprPushdown(enableCommonExprPushdown);
tResult.setCheckOverflowForDecimal(checkOverflowForDecimal);
tResult.setFragmentTransmissionCompressionCodec(fragmentTransmissionCompressionCodec);
tResult.setEnableLocalExchange(enableLocalExchange);
diff --git a/gensrc/thrift/PaloInternalService.thrift
b/gensrc/thrift/PaloInternalService.thrift
index 578701004e..cafcf1044d 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -202,6 +202,8 @@ struct TQueryOptions {
// For debug purpose, skip delete bitmap when reading data
63: optional bool skip_delete_bitmap = false
64: optional bool dry_run_query = false
+
+ 65: optional bool enable_common_expr_pushdown = false;
}
diff --git a/regression-test/data/correctness/test_pushdown_common_expr.out
b/regression-test/data/correctness/test_pushdown_common_expr.out
new file mode 100644
index 0000000000..e50a453c5c
--- /dev/null
+++ b/regression-test/data/correctness/test_pushdown_common_expr.out
@@ -0,0 +1,49 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !1 --
+1 a aa
+2 b bb
+4 c cc
+8 d dd
+
+-- !2 --
+64 g gg
+
+-- !3 --
+256 i ii
+
+-- !4 --
+1 a aa
+128 h hh
+16 e ee
+2 b bb
+256 i ii
+32 f ff
+4 c cc
+64 g gg
+8 d dd
+
+-- !1 --
+1 a aa
+128 h hh
+2 b bb
+4 c cc
+8 d dd
+
+-- !2 --
+64 g gg
+
+-- !3 --
+256 i ii
+
+-- !4 --
+1 a aa
+1024 k kk
+128 h hh
+16 e ee
+2 b bb
+256 i ii
+32 f ff
+4 c cc
+64 g gg
+8 d dd
+
diff --git
a/regression-test/suites/correctness/test_pushdown_common_expr.groovy
b/regression-test/suites/correctness/test_pushdown_common_expr.groovy
new file mode 100644
index 0000000000..8819288e10
--- /dev/null
+++ b/regression-test/suites/correctness/test_pushdown_common_expr.groovy
@@ -0,0 +1,86 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_pushdown_common_expr") {
+ sql """ DROP TABLE IF EXISTS t_pushdown_common_expr """
+ sql """
+ CREATE TABLE `t_pushdown_common_expr` (
+ `c1` int(11) NULL,
+ `c2` varchar(100) NULL COMMENT "",
+ `c3` varchar(100) NULL COMMENT ""
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`c1`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`c1`) BUCKETS 1
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1"
+ );
+ """
+
+ sql """
+ INSERT INTO t_pushdown_common_expr VALUES
+ (1,'a','aa'),
+ (2,'b','bb'),
+ (4,'c','cc'),
+ (8,'d','dd'),
+ (16,'e','ee'),
+ (32,'f','ff'),
+ (64,'g','gg'),
+ (128,'h','hh'),
+ (256,'i','ii'),
+ (512,'j','jj'),
+ (1024,'k','kk');
+ """
+
+ sql """set batch_size=8"""
+ sql """set enable_common_expr_pushdown=true"""
+
+ order_qt_1 """
+ SELECT * FROM t_pushdown_common_expr WHERE c3 LIKE "%c%" OR c1 < 10;
+ """
+
+ order_qt_2 """
+ SELECT * FROM t_pushdown_common_expr WHERE UPPER(c2)="G" OR
UPPER(c2)="P";
+ """
+
+ order_qt_3 """
+ SELECT * FROM t_pushdown_common_expr WHERE c1 = 256 OR c1 = 100;
+ """
+
+ order_qt_4 """
+ SELECT * FROM t_pushdown_common_expr WHERE c1 < 300 OR UPPER(c2)="F"
OR c3 LIKE "%f%";
+ """
+
+ sql """set enable_common_expr_pushdown=false"""
+
+ order_qt_1 """
+ SELECT * FROM t_pushdown_common_expr WHERE c3 LIKE "%h%" OR c1 < 10;
+ """
+
+ order_qt_2 """
+ SELECT * FROM t_pushdown_common_expr WHERE UPPER(c2)="G" OR
UPPER(c2)="P";
+ """
+
+ order_qt_3 """
+ SELECT * FROM t_pushdown_common_expr WHERE c1 = 256 OR c1 = 100;
+ """
+
+ order_qt_4 """
+ SELECT * FROM t_pushdown_common_expr WHERE c1 < 300 OR UPPER(c2)="K"
OR c3 LIKE "%k%";
+ """
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]