This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new f9baf9c556 [improvement](scan) Support pushdown execute expr ctx 
(#15917)
f9baf9c556 is described below

commit f9baf9c556e1ecf46d2c5039310d56953eeadea7
Author: Xinyi Zou <[email protected]>
AuthorDate: Fri Mar 10 08:35:32 2023 +0800

    [improvement](scan) Support pushdown execute expr ctx (#15917)
    
    In the past, only simple predicates (slot=const), and, like, or (only 
bitmap index) could be pushed down to the storage layer. scan process:
    
    Read part of the column first, and calculate the row ids with a simple 
push-down predicate.
    Use row ids to read the remaining columns and pass them to the scanner, and 
the scanner filters the remaining predicates.
    This pr will also push-down the remaining predicates (functions, nested 
predicates...) in the scanner to the storage layer for filtering. scan process:
    
    Read part of the column first, and use the push-down simple predicate to 
calculate the row ids, (same as above)
    Use row ids to read the columns needed for the remaining predicates, and 
use the pushed-down remaining predicates to reduce the number of row ids again.
    Use row ids to read the remaining columns and pass them to the scanner.
---
 be/src/olap/iterators.h                            |   1 +
 be/src/olap/olap_common.h                          |   2 +
 be/src/olap/reader.cpp                             |   1 +
 be/src/olap/reader.h                               |   1 +
 be/src/olap/rowset/beta_rowset_reader.cpp          |   1 +
 be/src/olap/rowset/rowset_reader_context.h         |   1 +
 be/src/olap/rowset/segment_v2/segment_iterator.cpp | 391 +++++++++++++++++----
 be/src/olap/rowset/segment_v2/segment_iterator.h   |  21 +-
 be/src/runtime/runtime_state.h                     |   5 +
 be/src/vec/core/block.cpp                          |  16 +-
 be/src/vec/core/block.h                            |   2 +-
 be/src/vec/exec/scan/new_olap_scan_node.cpp        |  16 +-
 be/src/vec/exec/scan/new_olap_scan_node.h          |   4 +
 be/src/vec/exec/scan/new_olap_scanner.cpp          |  17 +-
 be/src/vec/exec/scan/new_olap_scanner.h            |   3 +-
 be/src/vec/exec/scan/vscan_node.cpp                |   9 +-
 be/src/vec/exec/scan/vscan_node.h                  |   3 +
 be/src/vec/exec/scan/vscanner.cpp                  |   7 +-
 be/src/vec/exec/scan/vscanner.h                    |   1 +
 .../java/org/apache/doris/qe/SessionVariable.java  |   7 +
 gensrc/thrift/PaloInternalService.thrift           |   2 +
 .../data/correctness/test_pushdown_common_expr.out |  49 +++
 .../correctness/test_pushdown_common_expr.groovy   |  86 +++++
 23 files changed, 554 insertions(+), 92 deletions(-)

diff --git a/be/src/olap/iterators.h b/be/src/olap/iterators.h
index 58fd8f3ef7..62422f1410 100644
--- a/be/src/olap/iterators.h
+++ b/be/src/olap/iterators.h
@@ -119,6 +119,7 @@ public:
     std::vector<uint32_t>* read_orderby_key_columns = nullptr;
     IOContext io_ctx;
     vectorized::VExpr* remaining_vconjunct_root = nullptr;
+    vectorized::VExprContext* common_vexpr_ctxs_pushdown = nullptr;
     const std::set<int32_t>* output_columns = nullptr;
     // runtime state
     RuntimeState* runtime_state = nullptr;
diff --git a/be/src/olap/olap_common.h b/be/src/olap/olap_common.h
index 2ae791969b..5dfbb96a8b 100644
--- a/be/src/olap/olap_common.h
+++ b/be/src/olap/olap_common.h
@@ -306,6 +306,7 @@ struct OlapReaderStatistics {
     int64_t block_init_seek_num = 0;
     int64_t block_init_seek_ns = 0;
     int64_t first_read_ns = 0;
+    int64_t second_read_ns = 0;
     int64_t block_first_read_seek_num = 0;
     int64_t block_first_read_seek_ns = 0;
     int64_t lazy_read_ns = 0;
@@ -320,6 +321,7 @@ struct OlapReaderStatistics {
     int64_t rows_vec_del_cond_filtered = 0;
     int64_t vec_cond_ns = 0;
     int64_t short_cond_ns = 0;
+    int64_t expr_filter_ns = 0;
     int64_t output_col_ns = 0;
 
     int64_t rows_key_range_filtered = 0;
diff --git a/be/src/olap/reader.cpp b/be/src/olap/reader.cpp
index 7d03696bc7..dba7f222ac 100644
--- a/be/src/olap/reader.cpp
+++ b/be/src/olap/reader.cpp
@@ -225,6 +225,7 @@ Status TabletReader::_capture_rs_readers(const 
ReaderParams& read_params) {
     _reader_context.record_rowids = read_params.record_rowids;
     _reader_context.is_key_column_group = read_params.is_key_column_group;
     _reader_context.remaining_vconjunct_root = 
read_params.remaining_vconjunct_root;
+    _reader_context.common_vexpr_ctxs_pushdown = 
read_params.common_vexpr_ctxs_pushdown;
     _reader_context.output_columns = &read_params.output_columns;
 
     return Status::OK();
diff --git a/be/src/olap/reader.h b/be/src/olap/reader.h
index 41e095ade0..17f7e79286 100644
--- a/be/src/olap/reader.h
+++ b/be/src/olap/reader.h
@@ -113,6 +113,7 @@ public:
         std::unordered_set<uint32_t>* tablet_columns_convert_to_null_set = 
nullptr;
         TPushAggOp::type push_down_agg_type_opt = TPushAggOp::NONE;
         vectorized::VExpr* remaining_vconjunct_root = nullptr;
+        vectorized::VExprContext* common_vexpr_ctxs_pushdown = nullptr;
 
         // used for compaction to record row ids
         bool record_rowids = false;
diff --git a/be/src/olap/rowset/beta_rowset_reader.cpp 
b/be/src/olap/rowset/beta_rowset_reader.cpp
index 2ed1891892..0f4410a45f 100644
--- a/be/src/olap/rowset/beta_rowset_reader.cpp
+++ b/be/src/olap/rowset/beta_rowset_reader.cpp
@@ -66,6 +66,7 @@ Status 
BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context
     _read_options.stats = _stats;
     _read_options.push_down_agg_type_opt = _context->push_down_agg_type_opt;
     _read_options.remaining_vconjunct_root = 
_context->remaining_vconjunct_root;
+    _read_options.common_vexpr_ctxs_pushdown = 
_context->common_vexpr_ctxs_pushdown;
     _read_options.rowset_id = _rowset->rowset_id();
     _read_options.version = _rowset->version();
     _read_options.tablet_id = _rowset->rowset_meta()->tablet_id();
diff --git a/be/src/olap/rowset/rowset_reader_context.h 
b/be/src/olap/rowset/rowset_reader_context.h
index 0fce2cde12..7671a75d43 100644
--- a/be/src/olap/rowset/rowset_reader_context.h
+++ b/be/src/olap/rowset/rowset_reader_context.h
@@ -64,6 +64,7 @@ struct RowsetReaderContext {
     OlapReaderStatistics* stats = nullptr;
     RuntimeState* runtime_state = nullptr;
     vectorized::VExpr* remaining_vconjunct_root = nullptr;
+    vectorized::VExprContext* common_vexpr_ctxs_pushdown = nullptr;
     bool use_page_cache = false;
     int sequence_id_idx = -1;
     int batch_size = 1024;
diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp 
b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
index dfdb340027..7d73492be7 100644
--- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
@@ -32,9 +32,12 @@
 #include "util/doris_metrics.h"
 #include "util/key_util.h"
 #include "util/simd/bits.h"
+#include "vec/columns/column.h"
+#include "vec/columns/column_const.h"
 #include "vec/data_types/data_type_factory.hpp"
 #include "vec/data_types/data_type_number.h"
 #include "vec/exprs/vliteral.h"
+#include "vec/exprs/vslot_ref.h"
 
 namespace doris {
 using namespace ErrorCode;
@@ -153,7 +156,8 @@ SegmentIterator::SegmentIterator(std::shared_ptr<Segment> 
segment, const Schema&
           _cur_rowid(0),
           _lazy_materialization_read(false),
           _inited(false),
-          _estimate_row_size(true) {}
+          _estimate_row_size(true),
+          _wait_times_estimate_row_size(10) {}
 
 SegmentIterator::~SegmentIterator() {
     for (auto iter : _column_iterators) {
@@ -183,6 +187,8 @@ Status SegmentIterator::init(const StorageReadOptions& 
opts) {
     }
 
     _remaining_vconjunct_root = opts.remaining_vconjunct_root;
+    _common_vexpr_ctxs_pushdown = opts.common_vexpr_ctxs_pushdown;
+    _enable_common_expr_pushdown = _common_vexpr_ctxs_pushdown ? true : false;
     _column_predicate_info.reset(new ColumnPredicateInfo());
     _calculate_pred_in_remaining_vconjunct_root(_remaining_vconjunct_root);
 
@@ -465,6 +471,22 @@ bool SegmentIterator::_is_literal_node(const 
TExprNodeType::type& node_type) {
     }
 }
 
+Status SegmentIterator::_extract_common_expr_columns(vectorized::VExpr* expr) {
+    auto children = expr->children();
+    for (int i = 0; i < children.size(); ++i) {
+        RETURN_IF_ERROR(_extract_common_expr_columns(children[i]));
+    }
+
+    auto node_type = expr->node_type();
+    if (node_type == TExprNodeType::SLOT_REF) {
+        auto slot_expr = dynamic_cast<doris::vectorized::VSlotRef*>(expr);
+        _is_common_expr_column[_schema.column_id(slot_expr->column_id())] = 
true;
+        _common_expr_columns.insert(_schema.column_id(slot_expr->column_id()));
+    }
+
+    return Status::OK();
+}
+
 Status 
SegmentIterator::_execute_predicates_except_leafnode_of_andnode(vectorized::VExpr*
 expr) {
     if (expr == nullptr) {
         return Status::OK();
@@ -1154,6 +1176,7 @@ void SegmentIterator::_vec_init_lazy_materialization() {
         }
     }
 
+    // Step1: extract columns that can be lazy materialization
     if (!_col_predicates.empty() || !del_cond_id_set.empty()) {
         std::set<ColumnId> short_cir_pred_col_id_set; // using set for 
distinct cid
         std::set<ColumnId> vec_pred_col_id_set;
@@ -1163,7 +1186,7 @@ void SegmentIterator::_vec_init_lazy_materialization() {
             _is_pred_column[cid] = true;
             pred_column_ids.insert(cid);
 
-            // Step1: check pred using short eval or vec eval
+            // check pred using short eval or vec eval
             if (_can_evaluated_by_vectorized(predicate)) {
                 vec_pred_col_id_set.insert(predicate->column_id());
                 _pre_eval_block_predicate.push_back(predicate);
@@ -1195,58 +1218,92 @@ void SegmentIterator::_vec_init_lazy_materialization() {
         _is_need_short_eval = true;
     }
 
-    // Step 2: check non-predicate read costs to determine whether need lazy 
materialization
-    // fill _non_predicate_columns.
-    // After some optimization, we suppose lazy materialization is better 
performance.
+    // make _schema_block_id_map
+    _schema_block_id_map.resize(_schema.columns().size());
+    for (int i = 0; i < _schema.num_column_ids(); i++) {
+        auto cid = _schema.column_id(i);
+        _schema_block_id_map[cid] = i;
+    }
+
+    // Step2: extract columns that can execute expr context
+    _is_common_expr_column.resize(_schema.columns().size(), false);
+    if (_enable_common_expr_pushdown && _remaining_vconjunct_root != nullptr) {
+        _extract_common_expr_columns(_remaining_vconjunct_root);
+        if (!_common_expr_columns.empty()) {
+            _is_need_expr_eval = true;
+            for (auto cid : _schema.column_ids()) {
+                // pred column also needs to be filtered by expr
+                if (_is_common_expr_column[cid] || _is_pred_column[cid]) {
+                    auto loc = _schema_block_id_map[cid];
+                    _columns_to_filter.push_back(loc);
+                }
+            }
+        }
+    }
+
+    // Step 3: fill non predicate columns and second read column
+    // if _schema columns size equal to pred_column_ids size, 
lazy_materialization_read is false,
+    // all columns are lazy materialization columns without non predicte 
column.
+    // If common expr pushdown exists, and expr column is not contained in 
lazy materialization columns,
+    // add to second read column, which will be read after lazy materialization
     if (_schema.column_ids().size() > pred_column_ids.size()) {
         for (auto cid : _schema.column_ids()) {
             if (!_is_pred_column[cid]) {
-                _non_predicate_columns.push_back(cid);
                 if (_is_need_vec_eval || _is_need_short_eval) {
                     _lazy_materialization_read = true;
                 }
+                if (!_is_common_expr_column[cid]) {
+                    _non_predicate_columns.push_back(cid);
+                } else {
+                    _second_read_column_ids.push_back(cid);
+                }
             }
         }
     }
 
-    // Step 3: fill column ids for read and output
+    // Step 4: fill first read columns
     if (_lazy_materialization_read) {
         // insert pred cid to first_read_columns
         for (auto cid : pred_column_ids) {
             _first_read_column_ids.push_back(cid);
         }
-    } else if (!_is_need_vec_eval &&
-               !_is_need_short_eval) { // no pred exists, just read and output 
column
+    } else if (!_is_need_vec_eval && !_is_need_short_eval &&
+               !_is_need_expr_eval) { // no pred exists, just read and output 
column
         for (int i = 0; i < _schema.num_column_ids(); i++) {
             auto cid = _schema.column_id(i);
             _first_read_column_ids.push_back(cid);
         }
-    } else { // pred exits, but we can eliminate lazy materialization
-        // insert pred/non-pred cid to first read columns
-        std::set<ColumnId> pred_id_set;
-        pred_id_set.insert(_short_cir_pred_column_ids.begin(), 
_short_cir_pred_column_ids.end());
-        pred_id_set.insert(_vec_pred_column_ids.begin(), 
_vec_pred_column_ids.end());
-        std::set<ColumnId> non_pred_set(_non_predicate_columns.begin(),
-                                        _non_predicate_columns.end());
-
-        for (int i = 0; i < _schema.num_column_ids(); i++) {
-            auto cid = _schema.column_id(i);
-            if (pred_id_set.find(cid) != pred_id_set.end()) {
-                _first_read_column_ids.push_back(cid);
-            } else if (non_pred_set.find(cid) != non_pred_set.end()) {
+    } else {
+        if (_is_need_vec_eval || _is_need_short_eval) {
+            // TODO To refactor, because we suppose lazy materialization is 
better performance.
+            // pred exits, but we can eliminate lazy materialization
+            // insert pred/non-pred cid to first read columns
+            std::set<ColumnId> pred_id_set;
+            pred_id_set.insert(_short_cir_pred_column_ids.begin(),
+                               _short_cir_pred_column_ids.end());
+            pred_id_set.insert(_vec_pred_column_ids.begin(), 
_vec_pred_column_ids.end());
+            std::set<ColumnId> non_pred_set(_non_predicate_columns.begin(),
+                                            _non_predicate_columns.end());
+
+            DCHECK(_second_read_column_ids.empty());
+            // _second_read_column_ids must be empty. Otherwise 
_lazy_materialization_read must not false.
+            for (int i = 0; i < _schema.num_column_ids(); i++) {
+                auto cid = _schema.column_id(i);
+                if (pred_id_set.find(cid) != pred_id_set.end()) {
+                    _first_read_column_ids.push_back(cid);
+                } else if (non_pred_set.find(cid) != non_pred_set.end()) {
+                    _first_read_column_ids.push_back(cid);
+                    // when _lazy_materialization_read = false, non-predicate 
column should also be filtered by sel idx, so we regard it as pred columns
+                    _is_pred_column[cid] = true;
+                }
+            }
+        } else if (_is_need_expr_eval) {
+            DCHECK(!_is_need_vec_eval && !_is_need_short_eval);
+            for (auto cid : _common_expr_columns) {
                 _first_read_column_ids.push_back(cid);
-                // when _lazy_materialization_read = false, non-predicate 
column should also be filtered by sel idx, so we regard it as pred columns
-                _is_pred_column[cid] = true;
             }
         }
     }
-
-    // make _schema_block_id_map
-    _schema_block_id_map.resize(_schema.columns().size());
-    for (int i = 0; i < _schema.num_column_ids(); i++) {
-        auto cid = _schema.column_id(i);
-        _schema_block_id_map[cid] = i;
-    }
 }
 
 bool SegmentIterator::_can_evaluated_by_vectorized(ColumnPredicate* predicate) 
{
@@ -1281,6 +1338,9 @@ void SegmentIterator::_vec_init_char_column_id() {
         do {
             if (column_desc->type() == OLAP_FIELD_TYPE_CHAR) {
                 _char_type_idx.emplace_back(i);
+                if (i != 0) {
+                    _char_type_idx_no_0.emplace_back(i);
+                }
                 break;
             } else if (column_desc->type() != OLAP_FIELD_TYPE_ARRAY) {
                 break;
@@ -1355,7 +1415,7 @@ void 
SegmentIterator::_output_non_pred_columns(vectorized::Block* block) {
     SCOPED_RAW_TIMER(&_opts.stats->output_col_ns);
     for (auto cid : _non_predicate_columns) {
         auto loc = _schema_block_id_map[cid];
-        // if loc < block->block->columns() means the column is delete column 
and should
+        // if loc > block->columns() means the column is delete column and 
should
         // not output by block, so just skip the column.
         if (loc < block->columns()) {
             block->replace_by_position(loc, 
std::move(_current_return_columns[cid]));
@@ -1536,7 +1596,7 @@ Status SegmentIterator::next_batch(vectorized::Block* 
block) {
     if (UNLIKELY(!_inited)) {
         RETURN_IF_ERROR(_init());
         _inited = true;
-        if (_lazy_materialization_read || _opts.record_rowids) {
+        if (_lazy_materialization_read || _opts.record_rowids || 
_is_need_expr_eval) {
             _block_rowids.resize(_opts.block_row_max);
         }
         _current_return_columns.resize(_schema.columns().size());
@@ -1567,14 +1627,21 @@ Status SegmentIterator::next_batch(vectorized::Block* 
block) {
 
     _current_batch_rows_read = 0;
     uint32_t nrows_read_limit = _opts.block_row_max;
-    if (UNLIKELY(_estimate_row_size)) {
-        // read 100 rows to estimate average row size
+    if (_wait_times_estimate_row_size > 0) {
+        // first time, read 100 rows to estimate average row size, to avoid 
oom caused by a single batch being too large.
+        // If no valid data is read for the first time, block_row_max is read 
each time thereafter.
+        // Avoid low performance when valid data cannot be read all the time
         nrows_read_limit = std::min(nrows_read_limit, (uint32_t)100);
+        _wait_times_estimate_row_size--;
     }
     _split_row_ranges.clear();
     _split_row_ranges.reserve(nrows_read_limit / 2);
     _read_columns_by_index(nrows_read_limit, _current_batch_rows_read,
-                           _lazy_materialization_read || _opts.record_rowids);
+                           _lazy_materialization_read || _opts.record_rowids 
|| _is_need_expr_eval);
+    if (std::find(_first_read_column_ids.begin(), _first_read_column_ids.end(),
+                  _schema.version_col_idx()) != _first_read_column_ids.end()) {
+        _replace_version_col(_current_batch_rows_read);
+    }
 
     _opts.stats->blocks_load += 1;
     _opts.stats->raw_rows_read += _current_batch_rows_read;
@@ -1583,7 +1650,7 @@ Status SegmentIterator::next_batch(vectorized::Block* 
block) {
         for (int i = 0; i < block->columns(); i++) {
             auto cid = _schema.column_id(i);
             // todo(wb) abstract make column where
-            if (!_is_pred_column[cid]) { // non-predicate
+            if (!_is_pred_column[cid]) {
                 block->replace_by_position(i, 
std::move(_current_return_columns[cid]));
             }
         }
@@ -1591,23 +1658,109 @@ Status SegmentIterator::next_batch(vectorized::Block* 
block) {
         return Status::EndOfFile("no more data in segment");
     }
 
-    if (!_is_need_vec_eval && !_is_need_short_eval) {
-        _replace_version_col(_current_batch_rows_read);
+    if (!_is_need_vec_eval && !_is_need_short_eval && !_is_need_expr_eval) {
         _output_non_pred_columns(block);
         _output_index_result_column(nullptr, 0, block);
     } else {
-        _convert_dict_code_for_predicate_if_necessary();
         uint16_t selected_size = _current_batch_rows_read;
         uint16_t sel_rowid_idx[selected_size];
 
-        // step 1: evaluate vectorization predicate
-        selected_size = _evaluate_vectorization_predicate(sel_rowid_idx, 
selected_size);
+        if (_is_need_vec_eval || _is_need_short_eval) {
+            _convert_dict_code_for_predicate_if_necessary();
+
+            // step 1: evaluate vectorization predicate
+            selected_size = _evaluate_vectorization_predicate(sel_rowid_idx, 
selected_size);
+
+            // step 2: evaluate short circuit predicate
+            // todo(wb) research whether need to read short predicate after 
vectorization evaluation
+            //          to reduce cost of read short circuit columns.
+            //          In SSB test, it make no difference; So need more 
scenarios to test
+            selected_size = _evaluate_short_circuit_predicate(sel_rowid_idx, 
selected_size);
 
-        // step 2: evaluate short circuit predicate
-        // todo(wb) research whether need to read short predicate after 
vectorization evaluation
-        //          to reduce cost of read short circuit columns.
-        //          In SSB test, it make no difference; So need more scenarios 
to test
-        selected_size = _evaluate_short_circuit_predicate(sel_rowid_idx, 
selected_size);
+            if (selected_size > 0) {
+                // step 3.1: output short circuit and predicate column
+                // when lazy materialization enables, _first_read_column_ids = 
distinct(_short_cir_pred_column_ids + _vec_pred_column_ids)
+                // see _vec_init_lazy_materialization
+                // todo(wb) need to tell input columnids from output columnids
+                RETURN_IF_ERROR(_output_column_by_sel_idx(block, 
_first_read_column_ids,
+                                                          sel_rowid_idx, 
selected_size));
+
+                // step 3.2: read remaining expr column and evaluate it.
+                if (_is_need_expr_eval) {
+                    // The predicate column contains the remaining expr 
column, no need second read.
+                    if (!_second_read_column_ids.empty()) {
+                        SCOPED_RAW_TIMER(&_opts.stats->second_read_ns);
+                        RETURN_IF_ERROR(_read_columns_by_rowids(
+                                _second_read_column_ids, _block_rowids, 
sel_rowid_idx,
+                                selected_size, &_current_return_columns));
+                        if (std::find(_second_read_column_ids.begin(),
+                                      _second_read_column_ids.end(),
+                                      _schema.version_col_idx()) != 
_second_read_column_ids.end()) {
+                            _replace_version_col(selected_size);
+                        }
+                        for (auto cid : _second_read_column_ids) {
+                            auto loc = _schema_block_id_map[cid];
+                            block->replace_by_position(loc,
+                                                       
std::move(_current_return_columns[cid]));
+                        }
+                    }
+
+                    DCHECK(block->columns() > 
_schema_block_id_map[*_common_expr_columns.begin()]);
+                    // block->rows() takes the size of the first column by 
default. If the first column is no predicate column,
+                    // it has not been read yet. add a const column that has 
been read to calculate rows().
+                    if (block->rows() == 0) {
+                        vectorized::MutableColumnPtr col0 =
+                                
std::move(*block->get_by_position(0).column).mutate();
+                        auto res_column = vectorized::ColumnString::create();
+                        res_column->insert_data("", 0);
+                        auto col_const = 
vectorized::ColumnConst::create(std::move(res_column),
+                                                                         
selected_size);
+                        block->replace_by_position(0, std::move(col_const));
+                        _output_index_result_column(sel_rowid_idx, 
selected_size, block);
+                        
block->shrink_char_type_column_suffix_zero(_char_type_idx_no_0);
+                        RETURN_IF_ERROR(_execute_common_expr(sel_rowid_idx, 
selected_size, block));
+                        block->replace_by_position(0, std::move(col0));
+                    } else {
+                        _output_index_result_column(sel_rowid_idx, 
selected_size, block);
+                        
block->shrink_char_type_column_suffix_zero(_char_type_idx);
+                        RETURN_IF_ERROR(_execute_common_expr(sel_rowid_idx, 
selected_size, block));
+                    }
+                }
+            } else if (_is_need_expr_eval) {
+                for (auto cid : _second_read_column_ids) {
+                    auto loc = _schema_block_id_map[cid];
+                    block->replace_by_position(loc, 
std::move(_current_return_columns[cid]));
+                }
+            }
+        } else if (_is_need_expr_eval) {
+            DCHECK(!_first_read_column_ids.empty());
+            // first read all rows are insert block, initialize sel_rowid_idx 
to all rows.
+            for (auto cid : _first_read_column_ids) {
+                auto loc = _schema_block_id_map[cid];
+                block->replace_by_position(loc, 
std::move(_current_return_columns[cid]));
+            }
+            for (uint32_t i = 0; i < selected_size; ++i) {
+                sel_rowid_idx[i] = i;
+            }
+
+            if (block->rows() == 0) {
+                vectorized::MutableColumnPtr col0 =
+                        std::move(*block->get_by_position(0).column).mutate();
+                auto res_column = vectorized::ColumnString::create();
+                res_column->insert_data("", 0);
+                auto col_const =
+                        vectorized::ColumnConst::create(std::move(res_column), 
selected_size);
+                block->replace_by_position(0, std::move(col_const));
+                _output_index_result_column(nullptr, 0, block);
+                
block->shrink_char_type_column_suffix_zero(_char_type_idx_no_0);
+                RETURN_IF_ERROR(_execute_common_expr(sel_rowid_idx, 
selected_size, block));
+                block->replace_by_position(0, std::move(col0));
+            } else {
+                _output_index_result_column(nullptr, 0, block);
+                block->shrink_char_type_column_suffix_zero(_char_type_idx);
+                RETURN_IF_ERROR(_execute_common_expr(sel_rowid_idx, 
selected_size, block));
+            }
+        }
 
         if (UNLIKELY(_opts.record_rowids)) {
             _sel_rowid_idx.resize(selected_size);
@@ -1617,45 +1770,33 @@ Status SegmentIterator::next_batch(vectorized::Block* 
block) {
             }
         }
 
-        if (!_lazy_materialization_read) {
-            Status ret = Status::OK();
-            if (selected_size > 0) {
-                _replace_version_col(selected_size);
-                ret = _output_column_by_sel_idx(block, _first_read_column_ids, 
sel_rowid_idx,
-                                                selected_size);
-            }
-            if (!ret.ok()) {
-                return ret;
-            }
+        if (_non_predicate_columns.empty()) {
             // shrink char_type suffix zero data
             block->shrink_char_type_column_suffix_zero(_char_type_idx);
 
             if (UNLIKELY(_estimate_row_size) && block->rows() > 0) {
                 _update_max_row(block);
             }
-            return ret;
+            return Status::OK();
         }
 
-        // step3: read non_predicate column
-        RETURN_IF_ERROR(_read_columns_by_rowids(_non_predicate_columns, 
_block_rowids,
-                                                sel_rowid_idx, selected_size,
-                                                &_current_return_columns));
-
-        _replace_version_col(selected_size);
+        // step4: read non_predicate column
+        if (selected_size > 0) {
+            RETURN_IF_ERROR(_read_columns_by_rowids(_non_predicate_columns, 
_block_rowids,
+                                                    sel_rowid_idx, 
selected_size,
+                                                    &_current_return_columns));
+            if (std::find(_non_predicate_columns.begin(), 
_non_predicate_columns.end(),
+                          _schema.version_col_idx()) != 
_non_predicate_columns.end()) {
+                _replace_version_col(selected_size);
+            }
+        }
 
-        // step4: output columns
-        // 4.1 output non-predicate column
+        // step5: output columns
         _output_non_pred_columns(block);
 
-        // 4.3 output short circuit and predicate column
-        // when lazy materialization enables, _first_read_column_ids = 
distinct(_short_cir_pred_column_ids + _vec_pred_column_ids)
-        // see _vec_init_lazy_materialization
-        // todo(wb) need to tell input columnids from output columnids
-        if (selected_size > 0) {
-            RETURN_IF_ERROR(_output_column_by_sel_idx(block, 
_first_read_column_ids, sel_rowid_idx,
-                                                      selected_size));
+        if (!_is_need_expr_eval) {
+            _output_index_result_column(sel_rowid_idx, selected_size, block);
         }
-        _output_index_result_column(sel_rowid_idx, selected_size, block);
     }
 
     // shrink char_type suffix zero data
@@ -1680,6 +1821,106 @@ Status SegmentIterator::next_batch(vectorized::Block* 
block) {
     return Status::OK();
 }
 
+Status SegmentIterator::_execute_common_expr(uint16_t* sel_rowid_idx, 
uint16_t& selected_size,
+                                             vectorized::Block* block) {
+    SCOPED_RAW_TIMER(&_opts.stats->expr_filter_ns);
+    DCHECK(_remaining_vconjunct_root != nullptr);
+    DCHECK(block->rows() != 0);
+    size_t prev_columns = block->columns();
+    Defer defer {[&]() { vectorized::Block::erase_useless_column(block, 
prev_columns); }};
+
+    int result_column_id = -1;
+    RETURN_IF_ERROR(_common_vexpr_ctxs_pushdown->execute(block, 
&result_column_id));
+    vectorized::ColumnPtr filter_column = 
block->get_by_position(result_column_id).column;
+    if (auto* nullable_column =
+                
vectorized::check_and_get_column<vectorized::ColumnNullable>(*filter_column)) {
+        vectorized::ColumnPtr nested_column = 
nullable_column->get_nested_column_ptr();
+
+        vectorized::MutableColumnPtr mutable_holder =
+                nested_column->use_count() == 1
+                        ? nested_column->assume_mutable()
+                        : nested_column->clone_resized(nested_column->size());
+
+        vectorized::ColumnUInt8* concrete_column =
+                typeid_cast<vectorized::ColumnUInt8*>(mutable_holder.get());
+        if (!concrete_column) {
+            return Status::InvalidArgument(
+                    "Illegal type {} of column for filter. Must be UInt8 or 
Nullable(UInt8).",
+                    filter_column->get_name());
+        }
+        auto* __restrict null_map = 
nullable_column->get_null_map_data().data();
+        vectorized::IColumn::Filter& filter = concrete_column->get_data();
+        auto* __restrict filter_data = filter.data();
+
+        const size_t size = filter.size();
+        for (size_t i = 0; i < size; ++i) {
+            filter_data[i] &= !null_map[i];
+        }
+
+        selected_size = _evaluate_common_expr_filter(sel_rowid_idx, 
selected_size, filter);
+        vectorized::Block::filter_block_internal(block, _columns_to_filter, 
filter);
+    } else if (auto* const_column =
+                       
vectorized::check_and_get_column<vectorized::ColumnConst>(*filter_column)) {
+        bool ret = const_column->get_bool(0);
+        if (!ret) {
+            for (auto& col : _columns_to_filter) {
+                
std::move(*block->get_by_position(col).column).assume_mutable()->clear();
+            }
+            selected_size = 0;
+        }
+    } else {
+        const vectorized::IColumn::Filter& filter =
+                assert_cast<const 
doris::vectorized::ColumnVector<vectorized::UInt8>&>(
+                        *filter_column)
+                        .get_data();
+        selected_size = _evaluate_common_expr_filter(sel_rowid_idx, 
selected_size, filter);
+        vectorized::Block::filter_block_internal(block, _columns_to_filter, 
filter);
+    }
+    return Status::OK();
+}
+
+uint16_t SegmentIterator::_evaluate_common_expr_filter(uint16_t* sel_rowid_idx,
+                                                       uint16_t selected_size,
+                                                       const 
vectorized::IColumn::Filter& filter) {
+    size_t count = filter.size() - 
simd::count_zero_num((int8_t*)filter.data(), filter.size());
+    if (count == 0) {
+        return 0;
+    } else {
+        const vectorized::UInt8* filt_pos = filter.data();
+
+        uint16_t new_size = 0;
+        uint32_t sel_pos = 0;
+        const uint32_t sel_end = selected_size;
+        static constexpr size_t SIMD_BYTES = 32;
+        const uint32_t sel_end_simd = sel_pos + selected_size / SIMD_BYTES * 
SIMD_BYTES;
+
+        while (sel_pos < sel_end_simd) {
+            auto mask = simd::bytes32_mask_to_bits32_mask(filt_pos + sel_pos);
+            if (0 == mask) {
+                //pass
+            } else if (0xffffffff == mask) {
+                for (uint32_t i = 0; i < SIMD_BYTES; i++) {
+                    sel_rowid_idx[new_size++] = sel_rowid_idx[sel_pos + i];
+                }
+            } else {
+                while (mask) {
+                    const size_t bit_pos = __builtin_ctzll(mask);
+                    sel_rowid_idx[new_size++] = sel_rowid_idx[sel_pos + 
bit_pos];
+                    mask = mask & (mask - 1);
+                }
+            }
+            sel_pos += SIMD_BYTES;
+        }
+
+        for (; sel_pos < sel_end; sel_pos++) {
+            if (filt_pos[sel_pos]) {
+                sel_rowid_idx[new_size++] = sel_rowid_idx[sel_pos];
+            }
+        }
+        return new_size;
+    }
+}
+
 void SegmentIterator::_output_index_result_column(uint16_t* sel_rowid_idx, 
uint16_t select_size,
                                                   vectorized::Block* block) {
     SCOPED_RAW_TIMER(&_opts.stats->output_index_result_column_timer);
@@ -1776,7 +2017,7 @@ Status 
SegmentIterator::current_block_row_locations(std::vector<RowLocation>* bl
     DCHECK(_opts.record_rowids);
     DCHECK_GE(_block_rowids.size(), _current_batch_rows_read);
     uint32_t sid = segment_id();
-    if (!_is_need_vec_eval && !_is_need_short_eval) {
+    if (!_is_need_vec_eval && !_is_need_short_eval && !_is_need_expr_eval) {
         block_row_locations->resize(_current_batch_rows_read);
         for (auto i = 0; i < _current_batch_rows_read; i++) {
             (*block_row_locations)[i] = RowLocation(sid, _block_rowids[i]);
diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.h 
b/be/src/olap/rowset/segment_v2/segment_iterator.h
index 9eba9637c8..17af761071 100644
--- a/be/src/olap/rowset/segment_v2/segment_iterator.h
+++ b/be/src/olap/rowset/segment_v2/segment_iterator.h
@@ -170,7 +170,6 @@ private:
     Status _execute_compound_fn(const std::string& function_name);
     bool _is_literal_node(const TExprNodeType::type& node_type);
 
-    void _init_lazy_materialization();
     void _vec_init_lazy_materialization();
     // TODO: Fix Me
     // CHAR type in storage layer padding the 0 in length. But query engine 
need ignore the padding 0.
@@ -212,6 +211,12 @@ private:
 
     bool _can_evaluated_by_vectorized(ColumnPredicate* predicate);
 
+    Status _extract_common_expr_columns(vectorized::VExpr* expr);
+    Status _execute_common_expr(uint16_t* sel_rowid_idx, uint16_t& 
selected_size,
+                                vectorized::Block* block);
+    uint16_t _evaluate_common_expr_filter(uint16_t* sel_rowid_idx, uint16_t 
selected_size,
+                                          const vectorized::IColumn::Filter& 
filter);
+
     // Dictionary column should do something to initial.
     void _convert_dict_code_for_predicate_if_necessary();
 
@@ -317,15 +322,15 @@ private:
     // --------------------------------------------
     // whether lazy materialization read should be used.
     bool _lazy_materialization_read;
-    // columns to read before predicate evaluation
-    std::vector<ColumnId> _predicate_columns;
-    // columns to read after predicate evaluation
+    // columns to read after predicate evaluation and remaining expr execute
     std::vector<ColumnId> _non_predicate_columns;
+    std::set<ColumnId> _common_expr_columns;
     // remember the rowids we've read for the current row block.
     // could be a local variable of next_batch(), kept here to reuse vector 
memory
     std::vector<rowid_t> _block_rowids;
     bool _is_need_vec_eval = false;
     bool _is_need_short_eval = false;
+    bool _is_need_expr_eval = false;
 
     // fields for vectorization execution
     std::vector<ColumnId>
@@ -334,6 +339,7 @@ private:
             _short_cir_pred_column_ids; // keep columnId of columns for short 
circuit predicate evaluation
     std::vector<bool> _is_pred_column; // columns hold by segmentIter
     std::map<uint32_t, bool> _need_read_data_indices;
+    std::vector<bool> _is_common_expr_column;
     vectorized::MutableColumns _current_return_columns;
     std::vector<ColumnPredicate*> _pre_eval_block_predicate;
     std::vector<ColumnPredicate*> _short_cir_eval_predicate;
@@ -344,16 +350,22 @@ private:
     // second, read non-predicate columns
     // so we need a field to stand for columns first time to read
     std::vector<ColumnId> _first_read_column_ids;
+    std::vector<ColumnId> _second_read_column_ids;
+    std::vector<ColumnId> _columns_to_filter;
     std::vector<int> _schema_block_id_map; // map from schema column id to 
column idx in Block
 
     // the actual init process is delayed to the first call to next_batch()
     bool _inited;
     bool _estimate_row_size;
+    // Read up to 100 rows at a time while waiting for the estimated row size.
+    int _wait_times_estimate_row_size;
 
     StorageReadOptions _opts;
     // make a copy of `_opts.column_predicates` in order to make local changes
     std::vector<ColumnPredicate*> _col_predicates;
     std::vector<ColumnPredicate*> _col_preds_except_leafnode_of_andnode;
+    doris::vectorized::VExprContext* _common_vexpr_ctxs_pushdown;
+    bool _enable_common_expr_pushdown = false;
     doris::vectorized::VExpr* _remaining_vconjunct_root;
     std::vector<roaring::Roaring> 
_pred_except_leafnode_of_andnode_evaluate_result;
     std::unique_ptr<ColumnPredicateInfo> _column_predicate_info;
@@ -378,6 +390,7 @@ private:
 
     // char_type or array<char> type columns cid
     std::vector<size_t> _char_type_idx;
+    std::vector<size_t> _char_type_idx_no_0;
 
     // number of rows read in the current batch
     uint32_t _current_batch_rows_read = 0;
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 029c74d27d..d309390d6a 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -118,6 +118,11 @@ public:
                _query_options.check_overflow_for_decimal;
     }
 
+    bool enable_common_expr_pushdown() const {
+        return _query_options.__isset.enable_common_expr_pushdown &&
+               _query_options.enable_common_expr_pushdown;
+    }
+
     Status query_status() {
         std::lock_guard<std::mutex> l(_process_status_lock);
         return _process_status;
diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp
index cc09d53cd2..804ef8c9db 100644
--- a/be/src/vec/core/block.cpp
+++ b/be/src/vec/core/block.cpp
@@ -321,8 +321,8 @@ void Block::check_number_of_rows(bool allow_null_columns) 
const {
         if (rows == -1) {
             rows = size;
         } else if (rows != size) {
-            LOG(FATAL) << fmt::format("Sizes of columns doesn't match: 
{}:{},{}:{}",
-                                      data.front().name, rows, elem.name, 
size);
+            LOG(FATAL) << fmt::format("Sizes of columns doesn't match: 
{}:{},{}:{}, col size: {}",
+                                      data.front().name, rows, elem.name, 
size, each_col_size());
         }
     }
 }
@@ -337,7 +337,7 @@ size_t Block::rows() const {
     return 0;
 }
 
-std::string Block::each_col_size() {
+std::string Block::each_col_size() const {
     std::stringstream ss;
     for (const auto& elem : data) {
         if (elem.column) {
@@ -438,6 +438,11 @@ std::string Block::dump_data(size_t begin, size_t 
row_limit) const {
     // content
     for (size_t row_num = begin; row_num < rows() && row_num < row_limit + 
begin; ++row_num) {
         for (size_t i = 0; i < columns(); ++i) {
+            if (data[i].column->empty()) {
+                out << std::setfill(' ') << std::setw(1) << "|" << 
std::setw(headers_size[i])
+                    << std::right;
+                continue;
+            }
             std::string s = "";
             if (data[i].column) {
                 s = data[i].to_string(row_num);
@@ -959,6 +964,11 @@ std::string MutableBlock::dump_data(size_t row_limit) 
const {
     // content
     for (size_t row_num = 0; row_num < rows() && row_num < row_limit; 
++row_num) {
         for (size_t i = 0; i < columns(); ++i) {
+            if (_columns[i].get()->empty()) {
+                out << std::setfill(' ') << std::setw(1) << "|" << 
std::setw(headers_size[i])
+                    << std::right;
+                continue;
+            }
             std::string s = _data_types[i]->to_string(*_columns[i].get(), 
row_num);
             if (s.length() > headers_size[i]) {
                 s = s.substr(0, headers_size[i] - 3) + "...";
diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h
index ba1809f19a..bcaf6fa7d2 100644
--- a/be/src/vec/core/block.h
+++ b/be/src/vec/core/block.h
@@ -200,7 +200,7 @@ public:
     /// Returns number of rows from first column in block, not equal to 
nullptr. If no columns, returns 0.
     size_t rows() const;
 
-    std::string each_col_size();
+    std::string each_col_size() const;
 
     // Cut the rows in block, use in LIMIT operation
     void set_num_rows(size_t length);
diff --git a/be/src/vec/exec/scan/new_olap_scan_node.cpp 
b/be/src/vec/exec/scan/new_olap_scan_node.cpp
index 0ea05409cb..8761ca380c 100644
--- a/be/src/vec/exec/scan/new_olap_scan_node.cpp
+++ b/be/src/vec/exec/scan/new_olap_scan_node.cpp
@@ -80,7 +80,9 @@ Status NewOlapScanNode::_init_profile() {
     _rows_vec_cond_counter = ADD_COUNTER(_segment_profile, 
"RowsVectorPredFiltered", TUnit::UNIT);
     _vec_cond_timer = ADD_TIMER(_segment_profile, "VectorPredEvalTime");
     _short_cond_timer = ADD_TIMER(_segment_profile, "ShortPredEvalTime");
+    _expr_filter_timer = ADD_TIMER(_segment_profile, "ExprFilterEvalTime");
     _first_read_timer = ADD_TIMER(_segment_profile, "FirstReadTime");
+    _second_read_timer = ADD_TIMER(_segment_profile, "SecondReadTime");
     _first_read_seek_timer = ADD_TIMER(_segment_profile, "FirstReadSeekTime");
     _first_read_seek_counter = ADD_COUNTER(_segment_profile, 
"FirstReadSeekCount", TUnit::UNIT);
 
@@ -337,6 +339,14 @@ Status 
NewOlapScanNode::_should_push_down_function_filter(VectorizedFnCall* fn_c
     return Status::OK();
 }
 
+bool NewOlapScanNode::_should_push_down_common_expr() {
+    return _state->enable_common_expr_pushdown() &&
+           (_olap_scan_node.keyType == TKeysType::DUP_KEYS ||
+            (_olap_scan_node.keyType == TKeysType::UNIQUE_KEYS &&
+             _olap_scan_node.__isset.enable_unique_key_merge_on_write &&
+             _olap_scan_node.enable_unique_key_merge_on_write));
+}
+
 // PlanFragmentExecutor will call this method to set scan range
 // Doris scan range is defined in thrift file like this
 // struct TPaloScanRange {
@@ -437,9 +447,9 @@ Status 
NewOlapScanNode::_init_scanners(std::list<VScanner*>* scanners) {
             // add scanner to pool before doing prepare.
             // so that scanner can be automatically deconstructed if prepare 
failed.
             _scanner_pool.add(scanner);
-            RETURN_IF_ERROR(scanner->prepare(*scan_range, scanner_ranges, 
_vconjunct_ctx_ptr.get(),
-                                             _olap_filters, _filter_predicates,
-                                             _push_down_functions));
+            RETURN_IF_ERROR(scanner->prepare(
+                    *scan_range, scanner_ranges, _vconjunct_ctx_ptr.get(), 
_olap_filters,
+                    _filter_predicates, _push_down_functions, 
_common_vexpr_ctxs_pushdown.get()));
             scanners->push_back((VScanner*)scanner);
             disk_set.insert(scanner->scan_disk());
         }
diff --git a/be/src/vec/exec/scan/new_olap_scan_node.h 
b/be/src/vec/exec/scan/new_olap_scan_node.h
index 3043fe60f7..1363b88947 100644
--- a/be/src/vec/exec/scan/new_olap_scan_node.h
+++ b/be/src/vec/exec/scan/new_olap_scan_node.h
@@ -55,6 +55,8 @@ protected:
 
     PushDownType _should_push_down_is_null_predicate() override { return 
PushDownType::ACCEPTABLE; }
 
+    bool _should_push_down_common_expr() override;
+
     Status _init_scanners(std::list<VScanner*>* scanners) override;
 
 private:
@@ -91,6 +93,7 @@ private:
     RuntimeProfile::Counter* _rows_vec_cond_counter = nullptr;
     RuntimeProfile::Counter* _vec_cond_timer = nullptr;
     RuntimeProfile::Counter* _short_cond_timer = nullptr;
+    RuntimeProfile::Counter* _expr_filter_timer = nullptr;
     RuntimeProfile::Counter* _output_col_timer = nullptr;
 
     RuntimeProfile::Counter* _stats_filtered_counter = nullptr;
@@ -109,6 +112,7 @@ private:
     RuntimeProfile::Counter* _block_init_seek_counter = nullptr;
     RuntimeProfile::Counter* _block_conditions_filtered_timer = nullptr;
     RuntimeProfile::Counter* _first_read_timer = nullptr;
+    RuntimeProfile::Counter* _second_read_timer = nullptr;
     RuntimeProfile::Counter* _first_read_seek_timer = nullptr;
     RuntimeProfile::Counter* _first_read_seek_counter = nullptr;
     RuntimeProfile::Counter* _lazy_read_timer = nullptr;
diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp 
b/be/src/vec/exec/scan/new_olap_scanner.cpp
index 1eea0afaa8..9a503bd16d 100644
--- a/be/src/vec/exec/scan/new_olap_scanner.cpp
+++ b/be/src/vec/exec/scan/new_olap_scanner.cpp
@@ -51,8 +51,13 @@ Status NewOlapScanner::prepare(const TPaloScanRange& 
scan_range,
                                VExprContext** vconjunct_ctx_ptr,
                                const std::vector<TCondition>& filters,
                                const FilterPredicates& filter_predicates,
-                               const std::vector<FunctionFilter>& 
function_filters) {
+                               const std::vector<FunctionFilter>& 
function_filters,
+                               VExprContext** common_vexpr_ctxs_pushdown) {
     RETURN_IF_ERROR(VScanner::prepare(_state, vconjunct_ctx_ptr));
+    if (common_vexpr_ctxs_pushdown != nullptr) {
+        // Copy common_vexpr_ctxs_pushdown from scan node to this scanner's 
_common_vexpr_ctxs_pushdown, just necessary.
+        RETURN_IF_ERROR((*common_vexpr_ctxs_pushdown)->clone(_state, 
&_common_vexpr_ctxs_pushdown));
+    }
 
     // set limit to reduce end of rowset and segment mem use
     _tablet_reader = std::make_unique<BlockReader>();
@@ -209,8 +214,14 @@ Status NewOlapScanner::_init_tablet_reader_params(
                 real_parent->_olap_scan_node.push_down_agg_type_opt;
     }
     _tablet_reader_params.version = Version(0, _version);
+    // TODO: If a new runtime filter arrives after `_vconjunct_ctx` move to 
`_common_vexpr_ctxs_pushdown`,
+    // `_vconjunct_ctx` and `_common_vexpr_ctxs_pushdown` will have values at 
the same time,
+    // and the root() of `_vconjunct_ctx` and `_common_vexpr_ctxs_pushdown` 
should be merged as `remaining_vconjunct_root`
     _tablet_reader_params.remaining_vconjunct_root =
-            (_vconjunct_ctx == nullptr) ? nullptr : _vconjunct_ctx->root();
+            (_common_vexpr_ctxs_pushdown == nullptr)
+                    ? (_vconjunct_ctx == nullptr ? nullptr : 
_vconjunct_ctx->root())
+                    : _common_vexpr_ctxs_pushdown->root();
+    _tablet_reader_params.common_vexpr_ctxs_pushdown = 
_common_vexpr_ctxs_pushdown;
     _tablet_reader_params.output_columns = 
((NewOlapScanNode*)_parent)->_maybe_read_column_ids;
 
     // Condition
@@ -471,12 +482,14 @@ void NewOlapScanner::_update_counters_before_close() {
     _raw_rows_read += _tablet_reader->mutable_stats()->raw_rows_read;
     COUNTER_UPDATE(olap_parent->_vec_cond_timer, stats.vec_cond_ns);
     COUNTER_UPDATE(olap_parent->_short_cond_timer, stats.short_cond_ns);
+    COUNTER_UPDATE(olap_parent->_expr_filter_timer, stats.expr_filter_ns);
     COUNTER_UPDATE(olap_parent->_block_init_timer, stats.block_init_ns);
     COUNTER_UPDATE(olap_parent->_block_init_seek_timer, 
stats.block_init_seek_ns);
     COUNTER_UPDATE(olap_parent->_block_init_seek_counter, 
stats.block_init_seek_num);
     COUNTER_UPDATE(olap_parent->_block_conditions_filtered_timer,
                    stats.block_conditions_filtered_ns);
     COUNTER_UPDATE(olap_parent->_first_read_timer, stats.first_read_ns);
+    COUNTER_UPDATE(olap_parent->_second_read_timer, stats.second_read_ns);
     COUNTER_UPDATE(olap_parent->_first_read_seek_timer, 
stats.block_first_read_seek_ns);
     COUNTER_UPDATE(olap_parent->_first_read_seek_counter, 
stats.block_first_read_seek_num);
     COUNTER_UPDATE(olap_parent->_lazy_read_timer, stats.lazy_read_ns);
diff --git a/be/src/vec/exec/scan/new_olap_scanner.h 
b/be/src/vec/exec/scan/new_olap_scanner.h
index 1b3bfcb364..83968e2535 100644
--- a/be/src/vec/exec/scan/new_olap_scanner.h
+++ b/be/src/vec/exec/scan/new_olap_scanner.h
@@ -44,7 +44,8 @@ public:
     Status prepare(const TPaloScanRange& scan_range, const 
std::vector<OlapScanRange*>& key_ranges,
                    VExprContext** vconjunct_ctx_ptr, const 
std::vector<TCondition>& filters,
                    const FilterPredicates& filter_predicates,
-                   const std::vector<FunctionFilter>& function_filters);
+                   const std::vector<FunctionFilter>& function_filters,
+                   VExprContext** common_vexpr_ctxs_pushdown);
 
     const std::string& scan_disk() const { return _tablet->data_dir()->path(); 
}
 
diff --git a/be/src/vec/exec/scan/vscan_node.cpp 
b/be/src/vec/exec/scan/vscan_node.cpp
index 6093c6314d..e7b46ca659 100644
--- a/be/src/vec/exec/scan/vscan_node.cpp
+++ b/be/src/vec/exec/scan/vscan_node.cpp
@@ -388,6 +388,9 @@ void VScanNode::release_resource(RuntimeState* state) {
     for (auto& ctx : _stale_vexpr_ctxs) {
         (*ctx)->close(state);
     }
+    if (_common_vexpr_ctxs_pushdown) {
+        (*_common_vexpr_ctxs_pushdown)->close(state);
+    }
     _scanner_pool.clear();
 
     ExecNode::release_resource(state);
@@ -457,7 +460,11 @@ Status VScanNode::_normalize_conjuncts() {
             
RETURN_IF_ERROR(_normalize_predicate((*_vconjunct_ctx_ptr)->root(), &new_root));
             if (new_root) {
                 (*_vconjunct_ctx_ptr)->set_root(new_root);
-            } else {
+                if (_should_push_down_common_expr()) {
+                    _common_vexpr_ctxs_pushdown = 
std::move(_vconjunct_ctx_ptr);
+                    _vconjunct_ctx_ptr.reset(nullptr);
+                }
+            } else { // All conjucts are pushed down as predicate column
                 _stale_vexpr_ctxs.push_back(std::move(_vconjunct_ctx_ptr));
                 _vconjunct_ctx_ptr.reset(nullptr);
             }
diff --git a/be/src/vec/exec/scan/vscan_node.h 
b/be/src/vec/exec/scan/vscan_node.h
index 2d6e287af0..23b1e59b51 100644
--- a/be/src/vec/exec/scan/vscan_node.h
+++ b/be/src/vec/exec/scan/vscan_node.h
@@ -164,6 +164,8 @@ protected:
         return Status::OK();
     }
 
+    virtual bool _should_push_down_common_expr() { return false; }
+
     virtual PushDownType _should_push_down_bloom_filter() { return 
PushDownType::UNACCEPTABLE; }
 
     virtual PushDownType _should_push_down_bitmap_filter() { return 
PushDownType::UNACCEPTABLE; }
@@ -259,6 +261,7 @@ protected:
     // Every time vconjunct_ctx_ptr is updated, the old ctx will be stored in 
this vector
     // so that it will be destroyed uniformly at the end of the query.
     std::vector<std::unique_ptr<VExprContext*>> _stale_vexpr_ctxs;
+    std::unique_ptr<VExprContext*> _common_vexpr_ctxs_pushdown = nullptr;
 
     // If sort info is set, push limit to each scanner;
     int64_t _limit_per_scanner = -1;
diff --git a/be/src/vec/exec/scan/vscanner.cpp 
b/be/src/vec/exec/scan/vscanner.cpp
index 68dbbbfe9e..180b573614 100644
--- a/be/src/vec/exec/scan/vscanner.cpp
+++ b/be/src/vec/exec/scan/vscanner.cpp
@@ -78,9 +78,9 @@ Status VScanner::get_block(RuntimeState* state, Block* block, 
bool* eof) {
             {
                 SCOPED_TIMER(_parent->_filter_timer);
                 RETURN_IF_ERROR(_filter_output_block(block));
-                // record rows return (after filter) for _limit check
-                _num_rows_return += block->rows();
             }
+            // record rows return (after filter) for _limit check
+            _num_rows_return += block->rows();
         } while (!state->is_cancelled() && block->rows() == 0 && !(*eof) &&
                  _num_rows_read < rows_read_threshold);
     }
@@ -141,6 +141,9 @@ Status VScanner::close(RuntimeState* state) {
     if (_vconjunct_ctx) {
         _vconjunct_ctx->close(state);
     }
+    if (_common_vexpr_ctxs_pushdown) {
+        _common_vexpr_ctxs_pushdown->close(state);
+    }
 
     COUNTER_UPDATE(_parent->_scanner_wait_worker_timer, 
_scanner_wait_worker_timer);
     _is_closed = true;
diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h
index e36968e6f9..88cac3db42 100644
--- a/be/src/vec/exec/scan/vscanner.h
+++ b/be/src/vec/exec/scan/vscanner.h
@@ -153,6 +153,7 @@ protected:
     // Cloned from _vconjunct_ctx of scan node.
     // It includes predicate in SQL and runtime filters.
     VExprContext* _vconjunct_ctx = nullptr;
+    VExprContext* _common_vexpr_ctxs_pushdown = nullptr;
     // Late arriving runtime filters will update _vconjunct_ctx.
     // The old _vconjunct_ctx will be temporarily placed in _stale_vexpr_ctxs
     // and will be destroyed at the end.
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 4d7cb98994..1133cf6807 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -229,6 +229,8 @@ public class SessionVariable implements Serializable, 
Writable {
 
     public static final String ENABLE_FUNCTION_PUSHDOWN = 
"enable_function_pushdown";
 
+    public static final String ENABLE_COMMON_EXPR_PUSHDOWN = 
"enable_common_expr_pushdown";
+
     public static final String FRAGMENT_TRANSMISSION_COMPRESSION_CODEC = 
"fragment_transmission_compression_codec";
 
     public static final String ENABLE_LOCAL_EXCHANGE = "enable_local_exchange";
@@ -643,6 +645,9 @@ public class SessionVariable implements Serializable, 
Writable {
     @VariableMgr.VarAttr(name = ENABLE_FUNCTION_PUSHDOWN)
     public boolean enableFunctionPushdown = true;
 
+    @VariableMgr.VarAttr(name = ENABLE_COMMON_EXPR_PUSHDOWN, fuzzy = true)
+    public boolean enableCommonExprPushdown = true;
+
     @VariableMgr.VarAttr(name = ENABLE_LOCAL_EXCHANGE, fuzzy = true)
     public boolean enableLocalExchange = true;
 
@@ -765,6 +770,7 @@ public class SessionVariable implements Serializable, 
Writable {
     public void initFuzzyModeVariables() {
         Random random = new Random(System.currentTimeMillis());
         this.parallelExecInstanceNum = random.nextInt(8) + 1;
+        this.enableCommonExprPushdown = random.nextBoolean();
         this.enableLocalExchange = random.nextBoolean();
         // This will cause be dead loop, disable it first
         // this.disableJoinReorder = random.nextBoolean();
@@ -1609,6 +1615,7 @@ public class SessionVariable implements Serializable, 
Writable {
         }
 
         tResult.setEnableFunctionPushdown(enableFunctionPushdown);
+        tResult.setEnableCommonExprPushdown(enableCommonExprPushdown);
         tResult.setCheckOverflowForDecimal(checkOverflowForDecimal);
         
tResult.setFragmentTransmissionCompressionCodec(fragmentTransmissionCompressionCodec);
         tResult.setEnableLocalExchange(enableLocalExchange);
diff --git a/gensrc/thrift/PaloInternalService.thrift 
b/gensrc/thrift/PaloInternalService.thrift
index 578701004e..cafcf1044d 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -202,6 +202,8 @@ struct TQueryOptions {
   // For debug purpose, skip delete bitmap when reading data
   63: optional bool skip_delete_bitmap = false
   64: optional bool dry_run_query = false
+
+  65: optional bool enable_common_expr_pushdown = false;
 }
     
 
diff --git a/regression-test/data/correctness/test_pushdown_common_expr.out 
b/regression-test/data/correctness/test_pushdown_common_expr.out
new file mode 100644
index 0000000000..e50a453c5c
--- /dev/null
+++ b/regression-test/data/correctness/test_pushdown_common_expr.out
@@ -0,0 +1,49 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !1 --
+1      a       aa
+2      b       bb
+4      c       cc
+8      d       dd
+
+-- !2 --
+64     g       gg
+
+-- !3 --
+256    i       ii
+
+-- !4 --
+1      a       aa
+128    h       hh
+16     e       ee
+2      b       bb
+256    i       ii
+32     f       ff
+4      c       cc
+64     g       gg
+8      d       dd
+
+-- !1 --
+1      a       aa
+128    h       hh
+2      b       bb
+4      c       cc
+8      d       dd
+
+-- !2 --
+64     g       gg
+
+-- !3 --
+256    i       ii
+
+-- !4 --
+1      a       aa
+1024   k       kk
+128    h       hh
+16     e       ee
+2      b       bb
+256    i       ii
+32     f       ff
+4      c       cc
+64     g       gg
+8      d       dd
+
diff --git 
a/regression-test/suites/correctness/test_pushdown_common_expr.groovy 
b/regression-test/suites/correctness/test_pushdown_common_expr.groovy
new file mode 100644
index 0000000000..8819288e10
--- /dev/null
+++ b/regression-test/suites/correctness/test_pushdown_common_expr.groovy
@@ -0,0 +1,86 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_pushdown_common_expr") {
+    sql """ DROP TABLE IF EXISTS t_pushdown_common_expr """
+    sql """
+    CREATE TABLE `t_pushdown_common_expr` (
+        `c1` int(11) NULL,
+        `c2` varchar(100) NULL COMMENT "",
+        `c3` varchar(100) NULL COMMENT ""
+    ) ENGINE=OLAP
+    DUPLICATE KEY(`c1`)
+    COMMENT 'OLAP'
+    DISTRIBUTED BY HASH(`c1`) BUCKETS 1
+    PROPERTIES (
+        "replication_allocation" = "tag.location.default: 1"
+    );
+    """
+
+    sql """
+        INSERT INTO t_pushdown_common_expr VALUES
+        (1,'a','aa'),
+        (2,'b','bb'),
+        (4,'c','cc'),
+        (8,'d','dd'),
+        (16,'e','ee'),
+        (32,'f','ff'),
+        (64,'g','gg'),
+        (128,'h','hh'),
+        (256,'i','ii'),
+        (512,'j','jj'),
+        (1024,'k','kk');
+     """
+
+    sql """set batch_size=8"""
+    sql """set enable_common_expr_pushdown=true"""
+
+    order_qt_1 """
+        SELECT * FROM t_pushdown_common_expr WHERE c3 LIKE "%c%" OR c1 < 10;
+    """
+
+    order_qt_2 """
+        SELECT * FROM t_pushdown_common_expr WHERE UPPER(c2)="G" OR 
UPPER(c2)="P";
+    """
+
+    order_qt_3 """
+        SELECT * FROM t_pushdown_common_expr WHERE c1 = 256 OR c1 = 100;
+    """
+
+    order_qt_4 """
+        SELECT * FROM t_pushdown_common_expr WHERE c1 < 300 OR UPPER(c2)="F" 
OR c3 LIKE "%f%";
+    """
+
+    sql """set enable_common_expr_pushdown=false"""
+
+    order_qt_1 """
+        SELECT * FROM t_pushdown_common_expr WHERE c3 LIKE "%h%" OR c1 < 10;
+    """
+
+    order_qt_2 """
+        SELECT * FROM t_pushdown_common_expr WHERE UPPER(c2)="G" OR 
UPPER(c2)="P";
+    """
+
+    order_qt_3 """
+        SELECT * FROM t_pushdown_common_expr WHERE c1 = 256 OR c1 = 100;
+    """
+
+    order_qt_4 """
+        SELECT * FROM t_pushdown_common_expr WHERE c1 < 300 OR UPPER(c2)="K" 
OR c3 LIKE "%k%";
+    """
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to