This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 05da3d947f [feature-wip](new-scan) add scanner scheduling framework 
(#11582)
05da3d947f is described below

commit 05da3d947f63766b2ac2d6e430a4b2467ea17042
Author: Mingyu Chen <morningman....@gmail.com>
AuthorDate: Tue Aug 23 08:45:18 2022 +0800

    [feature-wip](new-scan) add scanner scheduling framework (#11582)
    
    There are currently many types of ScanNodes in Doris. And most of the logic 
of these ScanNodes is the same, including:
    
    Runtime filter
    Predicate pushdown
    Scanner generation and scheduling
    So I intend to unify the common logic of all ScanNodes.
    Different data sources only need to implement different Scanners for data 
access.
    So that the future optimization for scan can be applied to the scan of all 
data sources,
    while also reducing the code duplication.
    
    This PR mainly adds 4 new class:
    
    VScanner
    All Scanners' parent class. The subclasses can inherit this class to 
implement specific data access methods.
    
    VScanNode
    The unified ScanNode, and is responsible for common logic including 
RuntimeFilter, predicate pushdown, Scanner generation and scheduling.
    
    ScannerContext
    ScannerContext is responsible for recording the execution status
    of a group of Scanners corresponding to a ScanNode.
    Including how many scanners are being scheduled, and maintaining
    a producer-consumer blocks queue between scanners and scan nodes.
    
    ScannerContext is also the scheduling unit of ScannerScheduler.
    ScannerScheduler schedules a ScannerContext at a time,
    and submits the Scanners to the scanner thread pool for data scanning.
    
    ScannerScheduler
    Unified responsible for all Scanner scheduling tasks
    
    Test:
    This work is still in progress and default is disabled.
    I tested it with jmeter with 50 concurrency, but currently the scanner is 
just return without data.
    The QPS can reach about 9000.
    I can't compare it to origin implement because no data is read for now. I 
will test it when new olap scanner is ready.
    Co-authored-by: morningman <morning...@apache.org>
---
 be/src/common/config.h                             |   4 +
 be/src/exec/exec_node.cpp                          |  34 +-
 be/src/exec/olap_scan_node.cpp                     |   8 +-
 be/src/runtime/exec_env.h                          |   5 +-
 be/src/runtime/exec_env_init.cpp                   |   6 +
 be/src/runtime/fragment_mgr.cpp                    |  25 +-
 be/src/runtime/plan_fragment_executor.cpp          |  20 +-
 be/src/runtime/query_fragments_ctx.h               |  22 +-
 be/src/vec/CMakeLists.txt                          |  13 +-
 be/src/vec/exec/scan/new_olap_scan_node.cpp        | 307 ++++++++
 be/src/vec/exec/scan/new_olap_scan_node.h          |  64 ++
 be/src/vec/exec/scan/new_olap_scanner.cpp          | 325 ++++++++
 be/src/vec/exec/scan/new_olap_scanner.h            |  79 ++
 be/src/vec/exec/scan/scanner_context.cpp           | 232 ++++++
 be/src/vec/exec/scan/scanner_context.h             | 205 +++++
 be/src/vec/exec/scan/scanner_scheduler.cpp         | 253 ++++++
 be/src/vec/exec/scan/scanner_scheduler.h           |  94 +++
 be/src/vec/exec/scan/vscan_node.cpp                | 864 +++++++++++++++++++++
 be/src/vec/exec/scan/vscan_node.h                  | 246 ++++++
 be/src/vec/exec/scan/vscanner.cpp                  | 155 ++++
 be/src/vec/exec/scan/vscanner.h                    | 140 ++++
 be/src/vec/exec/volap_scan_node.cpp                |   7 +-
 be/src/vec/olap/block_reader.cpp                   |   3 +
 be/src/vec/sink/vdata_stream_sender.h              |   2 +-
 .../org/apache/doris/regression/Config.groovy      |   2 +-
 25 files changed, 3060 insertions(+), 55 deletions(-)

diff --git a/be/src/common/config.h b/be/src/common/config.h
index 44749c095c..10aeeff6d5 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -840,6 +840,10 @@ CONF_Int32(doris_remote_scanner_thread_pool_thread_num, 
"16");
 // number of s3 scanner thread pool queue size
 CONF_Int32(doris_remote_scanner_thread_pool_queue_size, "10240");
 
+// If set to true, the new scan node framework will be used.
+// This config should be removed when the new scan node is ready.
+CONF_Bool(enable_new_scan_node, "false");
+
 #ifdef BE_TEST
 // test s3
 CONF_String(test_s3_resource, "resource");
diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp
index 5c4981c739..66827019f9 100644
--- a/be/src/exec/exec_node.cpp
+++ b/be/src/exec/exec_node.cpp
@@ -62,6 +62,7 @@
 #include "vec/core/block.h"
 #include "vec/exec/file_scan_node.h"
 #include "vec/exec/join/vhash_join_node.h"
+#include "vec/exec/scan/new_olap_scan_node.h"
 #include "vec/exec/vaggregation_node.h"
 #include "vec/exec/vanalytic_eval_node.h"
 #include "vec/exec/vassert_num_rows_node.h"
@@ -213,12 +214,15 @@ Status ExecNode::prepare(RuntimeState* state) {
     if (_vconjunct_ctx_ptr) {
         RETURN_IF_ERROR((*_vconjunct_ctx_ptr)->prepare(state, 
_row_descriptor));
     }
-    if (typeid(*this) != typeid(doris::vectorized::VOlapScanNode)) {
+
+    // For vectorized olap scan node, the conjuncts is prepared in 
_vconjunct_ctx_ptr.
+    // And _conjunct_ctxs is useless.
+    // TODO: Should be removed when non-vec engine is removed.
+    if (typeid(*this) != typeid(doris::vectorized::VOlapScanNode) &&
+        typeid(*this) != typeid(doris::vectorized::NewOlapScanNode)) {
         RETURN_IF_ERROR(Expr::prepare(_conjunct_ctxs, state, _row_descriptor));
     }
 
-    // TODO(zc):
-    // AddExprCtxsToFree(_conjunct_ctxs);
     for (int i = 0; i < _children.size(); ++i) {
         RETURN_IF_ERROR(_children[i]->prepare(state));
     }
@@ -231,7 +235,8 @@ Status ExecNode::open(RuntimeState* state) {
     if (_vconjunct_ctx_ptr) {
         RETURN_IF_ERROR((*_vconjunct_ctx_ptr)->open(state));
     }
-    if (typeid(*this) != typeid(doris::vectorized::VOlapScanNode)) {
+    if (typeid(*this) != typeid(doris::vectorized::VOlapScanNode) &&
+        typeid(*this) != typeid(doris::vectorized::NewOlapScanNode)) {
         return Expr::open(_conjunct_ctxs, state);
     } else {
         return Status::OK();
@@ -275,7 +280,8 @@ Status ExecNode::close(RuntimeState* state) {
     if (_vconjunct_ctx_ptr) {
         (*_vconjunct_ctx_ptr)->close(state);
     }
-    if (typeid(*this) != typeid(doris::vectorized::VOlapScanNode)) {
+    if (typeid(*this) != typeid(doris::vectorized::VOlapScanNode) &&
+        typeid(*this) != typeid(doris::vectorized::NewOlapScanNode)) {
         Expr::close(_conjunct_ctxs, state);
     }
 
@@ -453,7 +459,11 @@ Status ExecNode::create_node(RuntimeState* state, 
ObjectPool* pool, const TPlanN
 
     case TPlanNodeType::OLAP_SCAN_NODE:
         if (state->enable_vectorized_exec()) {
-            *node = pool->add(new vectorized::VOlapScanNode(pool, tnode, 
descs));
+            if (config::enable_new_scan_node) {
+                *node = pool->add(new vectorized::NewOlapScanNode(pool, tnode, 
descs));
+            } else {
+                *node = pool->add(new vectorized::VOlapScanNode(pool, tnode, 
descs));
+            }
         } else {
             *node = pool->add(new OlapScanNode(pool, tnode, descs));
         }
@@ -682,8 +692,16 @@ void ExecNode::try_do_aggregate_serde_improve() {
         return;
     }
 
-    ScanNode* scan_node = static_cast<ScanNode*>(agg_node[0]->_children[0]);
-    scan_node->set_no_agg_finalize();
+    // TODO(cmy): should be removed when NewOlapScanNode is ready
+    ExecNode* child0 = agg_node[0]->_children[0];
+    if (typeid(*child0) == typeid(vectorized::NewOlapScanNode)) {
+        vectorized::VScanNode* scan_node =
+                static_cast<vectorized::VScanNode*>(agg_node[0]->_children[0]);
+        scan_node->set_no_agg_finalize();
+    } else {
+        ScanNode* scan_node = 
static_cast<ScanNode*>(agg_node[0]->_children[0]);
+        scan_node->set_no_agg_finalize();
+    }
 }
 
 void ExecNode::init_runtime_profile(const std::string& name) {
diff --git a/be/src/exec/olap_scan_node.cpp b/be/src/exec/olap_scan_node.cpp
index 99caa8afb9..c6759b2727 100644
--- a/be/src/exec/olap_scan_node.cpp
+++ b/be/src/exec/olap_scan_node.cpp
@@ -1491,13 +1491,6 @@ void OlapScanNode::transfer_thread(RuntimeState* state) {
         }
     }
 
-    ThreadPoolToken* thread_token = nullptr;
-    if (limit() != -1 && limit() < 1024) {
-        thread_token = state->get_query_fragments_ctx()->get_serial_token();
-    } else {
-        thread_token = state->get_query_fragments_ctx()->get_token();
-    }
-
     /*********************************
      * The basic strategy of priority scheduling:
      * 1. Determine the initial nice value by querying the number of split 
ranges
@@ -1508,6 +1501,7 @@ void OlapScanNode::transfer_thread(RuntimeState* state) {
      *    The larger the nice value, the more preferentially obtained query 
resources
      * 4. Regularly increase the priority of the remaining tasks in the queue 
to avoid starvation for large queries
      *********************************/
+    ThreadPoolToken* thread_token = 
state->get_query_fragments_ctx()->get_token();
     PriorityThreadPool* thread_pool = state->exec_env()->scan_thread_pool();
     PriorityThreadPool* remote_thread_pool = 
state->exec_env()->remote_scan_thread_pool();
     _total_assign_num = 0;
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index 6a3b71ca4d..bb10b75aaa 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -24,7 +24,8 @@
 namespace doris {
 namespace vectorized {
 class VDataStreamMgr;
-}
+class ScannerScheduler;
+} // namespace vectorized
 class BfdParser;
 class BrokerMgr;
 
@@ -155,6 +156,7 @@ public:
     StreamLoadExecutor* stream_load_executor() { return _stream_load_executor; 
}
     RoutineLoadTaskExecutor* routine_load_task_executor() { return 
_routine_load_task_executor; }
     HeartbeatFlags* heartbeat_flags() { return _heartbeat_flags; }
+    doris::vectorized::ScannerScheduler* scanner_scheduler() { return 
_scanner_scheduler; }
 
 private:
     Status _init(const std::vector<StorePath>& store_paths);
@@ -232,6 +234,7 @@ private:
     SmallFileMgr* _small_file_mgr = nullptr;
     HeartbeatFlags* _heartbeat_flags = nullptr;
     StoragePolicyMgr* _storage_policy_mgr = nullptr;
+    doris::vectorized::ScannerScheduler* _scanner_scheduler = nullptr;
 };
 
 template <>
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 360e2e7a54..3057681596 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -58,6 +58,7 @@
 #include "util/pretty_printer.h"
 #include "util/priority_thread_pool.hpp"
 #include "util/priority_work_stealing_thread_pool.hpp"
+#include "vec/exec/scan/scanner_scheduler.h"
 #include "vec/runtime/vdata_stream_mgr.h"
 
 #if !defined(__SANITIZE_ADDRESS__) && !defined(ADDRESS_SANITIZER) && 
!defined(LEAK_SANITIZER) && \
@@ -129,6 +130,8 @@ Status ExecEnv::_init(const std::vector<StorePath>& 
store_paths) {
             .set_max_queue_size(config::send_batch_thread_pool_queue_size)
             .build(&_send_batch_thread_pool);
 
+    _scanner_scheduler = new doris::vectorized::ScannerScheduler();
+
     _cgroups_mgr = new CgroupsMgr(this, config::doris_cgroups);
     _fragment_mgr = new FragmentMgr(this);
     _result_cache = new ResultCache(config::query_cache_max_size_mb,
@@ -160,6 +163,8 @@ Status ExecEnv::_init(const std::vector<StorePath>& 
store_paths) {
     }
     _broker_mgr->init();
     _small_file_mgr->init();
+    _scanner_scheduler->init(this);
+
     _init_mem_tracker();
 
     RETURN_IF_ERROR(
@@ -360,6 +365,7 @@ void ExecEnv::_destroy() {
     SAFE_DELETE(_heartbeat_flags);
     SAFE_DELETE(_task_pool_mem_tracker_registry);
     SAFE_DELETE(_buffer_reservation);
+    SAFE_DELETE(_scanner_scheduler);
 
     DEREGISTER_HOOK_METRIC(query_mem_consumption);
     DEREGISTER_HOOK_METRIC(load_mem_consumption);
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 7b60e9d406..60e4d2ffc4 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -245,6 +245,7 @@ Status FragmentExecState::execute() {
         CgroupsMgr::apply_system_cgroup();
         WARN_IF_ERROR(_executor.open(), strings::Substitute("Got error while 
opening fragment $0",
                                                             
print_id(_fragment_instance_id)));
+
         _executor.close();
     }
     DorisMetrics::instance()->fragment_requests_total->increment(1);
@@ -635,21 +636,33 @@ Status FragmentMgr::exec_plan_fragment(const 
TExecPlanFragmentParams& params, Fi
             fragments_ctx->set_rsc_info = true;
         }
 
-        if (params.__isset.query_options) {
-            fragments_ctx->timeout_second = params.query_options.query_timeout;
-            if (params.query_options.__isset.resource_limit) {
-                
fragments_ctx->set_thread_token(params.query_options.resource_limit.cpu_limit);
-            }
+        fragments_ctx->timeout_second = params.query_options.query_timeout;
+
+#ifndef BE_TEST
+        // set thread token
+        // the thread token will be set if
+        // 1. the cpu_limit is set, or
+        // 2. the limit is very small ( < 1024)
+        int concurrency = 1;
+        bool is_serial = false;
+        if (params.query_options.__isset.resource_limit &&
+            params.query_options.resource_limit.__isset.cpu_limit) {
+            concurrency = params.query_options.resource_limit.cpu_limit;
+        } else {
+            concurrency = config::doris_scanner_thread_pool_thread_num;
         }
         if (params.__isset.fragment && params.fragment.__isset.plan &&
             params.fragment.plan.nodes.size() > 0) {
             for (auto& node : params.fragment.plan.nodes) {
                 if (node.limit > 0 && node.limit < 1024) {
-                    fragments_ctx->set_serial_thread_token();
+                    concurrency = 1;
+                    is_serial = true;
                     break;
                 }
             }
         }
+        fragments_ctx->set_thread_token(concurrency, is_serial);
+#endif
 
         {
             // Find _fragments_ctx_map again, in case some other request has 
already
diff --git a/be/src/runtime/plan_fragment_executor.cpp 
b/be/src/runtime/plan_fragment_executor.cpp
index 6599785a41..90870b6a4e 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -47,6 +47,7 @@
 #include "util/telemetry/telemetry.h"
 #include "util/uid_util.h"
 #include "vec/core/block.h"
+#include "vec/exec/scan/new_olap_scan_node.h"
 #include "vec/exec/vexchange_node.h"
 #include "vec/runtime/vdata_stream_mgr.h"
 
@@ -165,11 +166,20 @@ Status PlanFragmentExecutor::prepare(const 
TExecPlanFragmentParams& request,
     _plan->try_do_aggregate_serde_improve();
 
     for (int i = 0; i < scan_nodes.size(); ++i) {
-        ScanNode* scan_node = static_cast<ScanNode*>(scan_nodes[i]);
-        const std::vector<TScanRangeParams>& scan_ranges =
-                find_with_default(params.per_node_scan_ranges, 
scan_node->id(), no_scan_ranges);
-        scan_node->set_scan_ranges(scan_ranges);
-        VLOG_CRITICAL << "scan_node_Id=" << scan_node->id() << " size=" << 
scan_ranges.size();
+        // TODO(cmy): this "if...else" should be removed once all ScanNode are 
derived from VScanNode.
+        ExecNode* node = scan_nodes[i];
+        if (typeid(*node) == typeid(vectorized::NewOlapScanNode)) {
+            vectorized::VScanNode* scan_node = 
static_cast<vectorized::VScanNode*>(scan_nodes[i]);
+            const std::vector<TScanRangeParams>& scan_ranges =
+                    find_with_default(params.per_node_scan_ranges, 
scan_node->id(), no_scan_ranges);
+            scan_node->set_scan_ranges(scan_ranges);
+        } else {
+            ScanNode* scan_node = static_cast<ScanNode*>(scan_nodes[i]);
+            const std::vector<TScanRangeParams>& scan_ranges =
+                    find_with_default(params.per_node_scan_ranges, 
scan_node->id(), no_scan_ranges);
+            scan_node->set_scan_ranges(scan_ranges);
+            VLOG_CRITICAL << "scan_node_Id=" << scan_node->id() << " size=" << 
scan_ranges.size();
+        }
     }
 
     _runtime_state->set_per_fragment_instance_idx(params.sender_id);
diff --git a/be/src/runtime/query_fragments_ctx.h 
b/be/src/runtime/query_fragments_ctx.h
index 78d84cb4b2..80a07f47e4 100644
--- a/be/src/runtime/query_fragments_ctx.h
+++ b/be/src/runtime/query_fragments_ctx.h
@@ -52,22 +52,15 @@ public:
         return false;
     }
 
-    void set_thread_token(int cpu_limit) {
-        if (cpu_limit > 0) {
-            // For now, cpu_limit will be the max concurrency of the scan 
thread pool token.
-            _thread_token = _exec_env->limited_scan_thread_pool()->new_token(
-                    ThreadPool::ExecutionMode::CONCURRENT, cpu_limit);
-        }
-    }
-    void set_serial_thread_token() {
-        _serial_thread_token = 
_exec_env->limited_scan_thread_pool()->new_token(
-                ThreadPool::ExecutionMode::SERIAL, 1);
+    void set_thread_token(int concurrency, bool is_serial) {
+        _thread_token = _exec_env->limited_scan_thread_pool()->new_token(
+                is_serial ? ThreadPool::ExecutionMode::SERIAL
+                          : ThreadPool::ExecutionMode::CONCURRENT,
+                concurrency);
     }
 
     ThreadPoolToken* get_token() { return _thread_token.get(); }
 
-    ThreadPoolToken* get_serial_token() { return _serial_thread_token.get(); }
-
     void set_ready_to_execute() {
         {
             std::lock_guard<std::mutex> l(_start_lock);
@@ -114,11 +107,6 @@ private:
     // If this token is not set, the scanner will be executed in 
"_scan_thread_pool" in exec env.
     std::unique_ptr<ThreadPoolToken> _thread_token;
 
-    // A token used to submit olap scanner to the "_limited_scan_thread_pool" 
serially, it used for
-    // query like `select * limit 1`, this query used for limit the max scaner 
thread to 1 to avoid
-    // this query cost too much resource
-    std::unique_ptr<ThreadPoolToken> _serial_thread_token;
-
     std::mutex _start_lock;
     std::condition_variable _start_cond;
     // Only valid when _need_wait_execution_trigger is set to true in 
FragmentExecState.
diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt
index 8743013590..f1fc4b4ec2 100644
--- a/be/src/vec/CMakeLists.txt
+++ b/be/src/vec/CMakeLists.txt
@@ -232,8 +232,15 @@ set(VEC_FILES
   exec/format/parquet/schema_desc.cpp
   exec/format/parquet/vparquet_column_reader.cpp
   exec/format/parquet/level_decoder.cpp
-  exec/format/parquet/parquet_common.cpp)
+  exec/format/parquet/parquet_common.cpp
+  exec/scan/vscan_node.cpp
+  exec/scan/vscanner.cpp
+  exec/scan/scanner_context.cpp
+  exec/scan/scanner_scheduler.cpp
+  exec/scan/new_olap_scan_node.cpp
+  exec/scan/new_olap_scanner.cpp
+)
 
 add_library(Vec STATIC
-    ${VEC_FILES}
-)
+        ${VEC_FILES}
+        )
diff --git a/be/src/vec/exec/scan/new_olap_scan_node.cpp 
b/be/src/vec/exec/scan/new_olap_scan_node.cpp
new file mode 100644
index 0000000000..bd48e288ec
--- /dev/null
+++ b/be/src/vec/exec/scan/new_olap_scan_node.cpp
@@ -0,0 +1,307 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exec/scan/new_olap_scan_node.h"
+
+#include "olap/storage_engine.h"
+#include "olap/tablet.h"
+#include "vec/columns/column_const.h"
+#include "vec/exec/scan/new_olap_scanner.h"
+#include "vec/functions/in.h"
+
+namespace doris::vectorized {
+
+NewOlapScanNode::NewOlapScanNode(ObjectPool* pool, const TPlanNode& tnode,
+                                 const DescriptorTbl& descs)
+        : VScanNode(pool, tnode, descs), _olap_scan_node(tnode.olap_scan_node) 
{
+    _output_tuple_id = tnode.olap_scan_node.tuple_id;
+    if (_olap_scan_node.__isset.sort_info && 
_olap_scan_node.__isset.sort_limit) {
+        _limit_per_scanner = _olap_scan_node.sort_limit;
+    }
+}
+
+Status NewOlapScanNode::prepare(RuntimeState* state) {
+    RETURN_IF_ERROR(VScanNode::prepare(state));
+    _scanner_mem_tracker = std::make_unique<MemTracker>("OlapScanners");
+    return Status::OK();
+}
+
+Status NewOlapScanNode::_init_profile() {
+    return Status::OK();
+}
+
+Status NewOlapScanNode::_process_conjuncts() {
+    RETURN_IF_ERROR(VScanNode::_process_conjuncts());
+    if (_eos) {
+        return Status::OK();
+    }
+    RETURN_IF_ERROR(_build_key_ranges_and_filters());
+    return Status::OK();
+}
+
+Status NewOlapScanNode::_build_key_ranges_and_filters() {
+    const std::vector<std::string>& column_names = 
_olap_scan_node.key_column_name;
+    const std::vector<TPrimitiveType::type>& column_types = 
_olap_scan_node.key_column_type;
+    DCHECK(column_types.size() == column_names.size());
+
+    // 1. construct scan key except last olap engine short key
+    _scan_keys.set_is_convertible(limit() == -1);
+
+    // we use `exact_range` to identify a key range is an exact range or not 
when we convert
+    // it to `_scan_keys`. If `exact_range` is true, we can just discard it 
from `_olap_filters`.
+    bool exact_range = true;
+    for (int column_index = 0; column_index < column_names.size() && 
!_scan_keys.has_range_value();
+         ++column_index) {
+        auto iter = _colname_to_value_range.find(column_names[column_index]);
+        if (_colname_to_value_range.end() == iter) {
+            break;
+        }
+
+        RETURN_IF_ERROR(std::visit(
+                [&](auto&& range) {
+                    RETURN_IF_ERROR(
+                            _scan_keys.extend_scan_key(range, 
_max_scan_key_num, &exact_range));
+                    if (exact_range) {
+                        _colname_to_value_range.erase(iter->first);
+                    }
+                    return Status::OK();
+                },
+                iter->second));
+    }
+
+    for (auto& iter : _colname_to_value_range) {
+        std::vector<TCondition> filters;
+        std::visit([&](auto&& range) { range.to_olap_filter(filters); }, 
iter.second);
+
+        for (const auto& filter : filters) {
+            _olap_filters.push_back(std::move(filter));
+        }
+    }
+
+    // _runtime_profile->add_info_string("PushDownPredicate", 
olap_filters_to_string(_olap_filters));
+    // _runtime_profile->add_info_string("KeyRanges", 
_scan_keys.debug_string());
+    VLOG_CRITICAL << _scan_keys.debug_string();
+
+    return Status::OK();
+}
+
+bool NewOlapScanNode::_should_push_down_binary_predicate(
+        VectorizedFnCall* fn_call, VExprContext* expr_ctx, StringRef* 
constant_val,
+        int* slot_ref_child, const std::function<bool(const std::string&)>& 
fn_checker) {
+    if (!fn_checker(fn_call->fn().name.function_name)) {
+        return false;
+    }
+
+    const auto& children = fn_call->children();
+    DCHECK(children.size() == 2);
+    for (size_t i = 0; i < children.size(); i++) {
+        if (VExpr::expr_without_cast(children[i])->node_type() != 
TExprNodeType::SLOT_REF) {
+            // not a slot ref(column)
+            continue;
+        }
+        if (!children[1 - i]->is_constant()) {
+            // only handle constant value
+            return false;
+        } else {
+            if (const ColumnConst* const_column = 
check_and_get_column<ColumnConst>(
+                        children[1 - i]->get_const_col(expr_ctx)->column_ptr)) 
{
+                *slot_ref_child = i;
+                *constant_val = const_column->get_data_at(0);
+            } else {
+                return false;
+            }
+        }
+    }
+    return true;
+}
+
+bool NewOlapScanNode::_should_push_down_in_predicate(VInPredicate* pred, 
VExprContext* expr_ctx,
+                                                     bool is_not_in) {
+    if (pred->is_not_in() != is_not_in) {
+        return false;
+    }
+    InState* state = reinterpret_cast<InState*>(
+            expr_ctx->fn_context(pred->fn_context_index())
+                    ->get_function_state(FunctionContext::FRAGMENT_LOCAL));
+    HybridSetBase* set = state->hybrid_set.get();
+
+    // if there are too many elements in InPredicate, exceed the limit,
+    // we will not push any condition of this column to storage engine.
+    // because too many conditions pushed down to storage engine may even
+    // slow down the query process.
+    // ATTN: This is just an experience value. You may need to try
+    // different thresholds to improve performance.
+    if (set->size() > _max_pushdown_conditions_per_column) {
+        VLOG_NOTICE << "Predicate value num " << set->size() << " exceed limit 
"
+                    << _max_pushdown_conditions_per_column;
+        return false;
+    }
+    return true;
+}
+
+bool NewOlapScanNode::_should_push_down_function_filter(VectorizedFnCall* 
fn_call,
+                                                        VExprContext* expr_ctx,
+                                                        StringVal* 
constant_str,
+                                                        
doris_udf::FunctionContext** fn_ctx) {
+    // Now only `like` function filters is supported to push down
+    if (fn_call->fn().name.function_name != "like") {
+        return false;
+    }
+
+    const auto& children = fn_call->children();
+    doris_udf::FunctionContext* func_cxt = 
expr_ctx->fn_context(fn_call->fn_context_index());
+    DCHECK(func_cxt != nullptr);
+    DCHECK(children.size() == 2);
+    for (size_t i = 0; i < children.size(); i++) {
+        if (VExpr::expr_without_cast(children[i])->node_type() != 
TExprNodeType::SLOT_REF) {
+            // not a slot ref(column)
+            continue;
+        }
+        if (!children[1 - i]->is_constant()) {
+            // only handle constant value
+            return false;
+        } else {
+            DCHECK(children[1 - i]->type().is_string_type());
+            if (const ColumnConst* const_column = 
check_and_get_column<ColumnConst>(
+                        children[1 - i]->get_const_col(expr_ctx)->column_ptr)) 
{
+                *constant_str = const_column->get_data_at(0).to_string_val();
+            } else {
+                return false;
+            }
+        }
+    }
+    *fn_ctx = func_cxt;
+    return true;
+}
+
+// PlanFragmentExecutor will call this method to set scan range
+// Doris scan range is defined in thrift file like this
+// struct TPaloScanRange {
+//  1: required list<Types.TNetworkAddress> hosts
+//  2: required string schema_hash
+//  3: required string version
+//  5: required Types.TTabletId tablet_id
+//  6: required string db_name
+//  7: optional list<TKeyRange> partition_column_ranges
+//  8: optional string index_name
+//  9: optional string table_name
+//}
+// every doris_scan_range is related with one tablet so that one olap scan 
node contains multiple tablet
+void NewOlapScanNode::set_scan_ranges(const std::vector<TScanRangeParams>& 
scan_ranges) {
+    for (auto& scan_range : scan_ranges) {
+        DCHECK(scan_range.scan_range.__isset.palo_scan_range);
+        _scan_ranges.emplace_back(new 
TPaloScanRange(scan_range.scan_range.palo_scan_range));
+        // COUNTER_UPDATE(_tablet_counter, 1);
+    }
+    // telemetry::set_current_span_attribute(_tablet_counter);
+
+    return;
+}
+
+Status NewOlapScanNode::_init_scanners(std::list<VScanner*>* scanners) {
+    if (_scan_ranges.empty()) {
+        _eos = true;
+        return Status::OK();
+    }
+    auto span = opentelemetry::trace::Tracer::GetCurrentSpan();
+
+    // ranges constructed from scan keys
+    std::vector<std::unique_ptr<doris::OlapScanRange>> cond_ranges;
+    RETURN_IF_ERROR(_scan_keys.get_key_range(&cond_ranges));
+    // if we can't get ranges from conditions, we give it a total range
+    if (cond_ranges.empty()) {
+        cond_ranges.emplace_back(new doris::OlapScanRange());
+    }
+    int scanners_per_tablet = std::max(1, 64 / (int)_scan_ranges.size());
+
+    // std::unordered_set<std::string> disk_set;
+    for (auto& scan_range : _scan_ranges) {
+        auto tablet_id = scan_range->tablet_id;
+        std::string err;
+        TabletSharedPtr tablet =
+                
StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id, true, &err);
+        if (tablet == nullptr) {
+            std::stringstream ss;
+            ss << "failed to get tablet: " << tablet_id << ", reason: " << err;
+            LOG(WARNING) << ss.str();
+            return Status::InternalError(ss.str());
+        }
+
+        std::vector<std::unique_ptr<doris::OlapScanRange>>* ranges = 
&cond_ranges;
+        int size_based_scanners_per_tablet = 1;
+
+        if (config::doris_scan_range_max_mb > 0) {
+            size_based_scanners_per_tablet = std::max(
+                    1, (int)tablet->tablet_footprint() / 
config::doris_scan_range_max_mb << 20);
+        }
+
+        int ranges_per_scanner =
+                std::max(1, (int)ranges->size() /
+                                    std::min(scanners_per_tablet, 
size_based_scanners_per_tablet));
+        int num_ranges = ranges->size();
+        for (int i = 0; i < num_ranges;) {
+            std::vector<doris::OlapScanRange*> scanner_ranges;
+            scanner_ranges.push_back((*ranges)[i].get());
+            ++i;
+            for (int j = 1; i < num_ranges && j < ranges_per_scanner &&
+                            (*ranges)[i]->end_include == (*ranges)[i - 
1]->end_include;
+                 ++j, ++i) {
+                scanner_ranges.push_back((*ranges)[i].get());
+            }
+
+            NewOlapScanner* scanner = new NewOlapScanner(
+                    _state, this, _limit_per_scanner, 
_olap_scan_node.is_preaggregation,
+                    _need_agg_finalize, *scan_range, 
_scanner_mem_tracker.get());
+            // add scanner to pool before doing prepare.
+            // so that scanner can be automatically deconstructed if prepare 
failed.
+            _scanner_pool.add(scanner);
+            RETURN_IF_ERROR(scanner->prepare(*scan_range, scanner_ranges, 
_vconjunct_ctx_ptr.get(),
+                                             _olap_filters, 
_bloom_filters_push_down,
+                                             _push_down_functions));
+            scanners->push_back((VScanner*)scanner);
+            // disk_set.insert(scanner->scan_disk());
+        }
+    }
+
+    // COUNTER_SET(_num_disks_accessed_counter, 
static_cast<int64_t>(disk_set.size()));
+    // COUNTER_SET(_num_scanners, 
static_cast<int64_t>(_volap_scanners.size()));
+    // telemetry::set_span_attribute(span, _num_disks_accessed_counter);
+    // telemetry::set_span_attribute(span, _num_scanners);
+
+    // init progress
+    // std::stringstream ss;
+    // ss << "ScanThread complete (node=" << id() << "):";
+    // _progress = ProgressUpdater(ss.str(), _volap_scanners.size(), 1);
+    return Status::OK();
+}
+
+bool NewOlapScanNode::_is_key_column(const std::string& key_name) {
+    // all column in dup_keys table or unique_keys with merge on write table 
olap scan node threat
+    // as key column
+    if (_olap_scan_node.keyType == TKeysType::DUP_KEYS ||
+        (_olap_scan_node.keyType == TKeysType::UNIQUE_KEYS &&
+         _olap_scan_node.__isset.enable_unique_key_merge_on_write &&
+         _olap_scan_node.enable_unique_key_merge_on_write)) {
+        return true;
+    }
+
+    auto res = std::find(_olap_scan_node.key_column_name.begin(),
+                         _olap_scan_node.key_column_name.end(), key_name);
+    return res != _olap_scan_node.key_column_name.end();
+}
+
+}; // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/new_olap_scan_node.h 
b/be/src/vec/exec/scan/new_olap_scan_node.h
new file mode 100644
index 0000000000..508c8851d5
--- /dev/null
+++ b/be/src/vec/exec/scan/new_olap_scan_node.h
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "vec/exec/scan/vscan_node.h"
+
+namespace doris::vectorized {
+
+class NewOlapScanner;
+class NewOlapScanNode : public VScanNode {
+public:
+    NewOlapScanNode(ObjectPool* pool, const TPlanNode& tnode, const 
DescriptorTbl& descs);
+    friend class NewOlapScanner;
+
+    Status prepare(RuntimeState* state) override;
+
+    void set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) 
override;
+
+protected:
+    Status _init_profile() override;
+    Status _process_conjuncts() override;
+    bool _is_key_column(const std::string& col_name) override;
+
+    bool _should_push_down_binary_predicate(
+            VectorizedFnCall* fn_call, VExprContext* expr_ctx, StringRef* 
constant_val,
+            int* slot_ref_child,
+            const std::function<bool(const std::string&)>& fn_checker) 
override;
+
+    bool _should_push_down_in_predicate(VInPredicate* in_pred, VExprContext* 
expr_ctx,
+                                        bool is_not_in) override;
+
+    bool _should_push_down_function_filter(VectorizedFnCall* fn_call, 
VExprContext* expr_ctx,
+                                           StringVal* constant_str,
+                                           doris_udf::FunctionContext** 
fn_ctx) override;
+
+    Status _init_scanners(std::list<VScanner*>* scanners) override;
+
+private:
+    Status _build_key_ranges_and_filters();
+
+private:
+    TOlapScanNode _olap_scan_node;
+    std::vector<std::unique_ptr<TPaloScanRange>> _scan_ranges;
+    OlapScanKeys _scan_keys;
+
+    std::unique_ptr<MemTracker> _scanner_mem_tracker;
+};
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp 
b/be/src/vec/exec/scan/new_olap_scanner.cpp
new file mode 100644
index 0000000000..3ae67a34be
--- /dev/null
+++ b/be/src/vec/exec/scan/new_olap_scanner.cpp
@@ -0,0 +1,325 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exec/scan/new_olap_scanner.h"
+
+#include "olap/storage_engine.h"
+#include "vec/exec/scan/new_olap_scan_node.h"
+#include "vec/olap/block_reader.h"
+
+namespace doris::vectorized {
+
+NewOlapScanner::NewOlapScanner(RuntimeState* state, NewOlapScanNode* parent, 
int64_t limit,
+                               bool aggregation, bool need_agg_finalize,
+                               const TPaloScanRange& scan_range, MemTracker* 
tracker)
+        : VScanner(state, static_cast<VScanNode*>(parent), limit, tracker),
+          _aggregation(aggregation),
+          _need_agg_finalize(need_agg_finalize),
+          _version(-1) {
+    _tablet_schema = std::make_shared<TabletSchema>();
+}
+
+Status NewOlapScanner::prepare(
+        const TPaloScanRange& scan_range, const std::vector<OlapScanRange*>& 
key_ranges,
+        VExprContext** vconjunct_ctx_ptr, const std::vector<TCondition>& 
filters,
+        const std::vector<std::pair<string, 
std::shared_ptr<IBloomFilterFuncBase>>>& bloom_filters,
+        const std::vector<FunctionFilter>& function_filters) {
+    SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
+
+    if (vconjunct_ctx_ptr != nullptr) {
+        // Copy vconjunct_ctx_ptr from scan node to this scanner's 
_vconjunct_ctx.
+        RETURN_IF_ERROR((*vconjunct_ctx_ptr)->clone(_state, &_vconjunct_ctx));
+    }
+
+    // set limit to reduce end of rowset and segment mem use
+    _tablet_reader = std::make_unique<BlockReader>();
+    _tablet_reader->set_batch_size(
+            _parent->limit() == -1
+                    ? _state->batch_size()
+                    : std::min(static_cast<int64_t>(_state->batch_size()), 
_parent->limit()));
+
+    // Get olap table
+    TTabletId tablet_id = scan_range.tablet_id;
+    _version = strtoul(scan_range.version.c_str(), nullptr, 10);
+    {
+        std::string err;
+        _tablet = 
StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id, true, &err);
+        if (_tablet.get() == nullptr) {
+            std::stringstream ss;
+            ss << "failed to get tablet. tablet_id=" << tablet_id << ", 
reason=" << err;
+            LOG(WARNING) << ss.str();
+            return Status::InternalError(ss.str());
+        }
+        _tablet_schema->copy_from(*_tablet->tablet_schema());
+
+        TOlapScanNode& olap_scan_node = 
((NewOlapScanNode*)_parent)->_olap_scan_node;
+        if (!olap_scan_node.columns_desc.empty() &&
+            olap_scan_node.columns_desc[0].col_unique_id >= 0) {
+            // Originally scanner get TabletSchema from tablet object in BE.
+            // To support lightweight schema change for adding / dropping 
columns,
+            // tabletschema is bounded to rowset and tablet's schema maybe 
outdated,
+            //  so we have to use schema from a query plan witch FE puts it in 
query plans.
+            _tablet_schema->clear_columns();
+            for (const auto& column_desc : olap_scan_node.columns_desc) {
+                _tablet_schema->append_column(TabletColumn(column_desc));
+            }
+        }
+        {
+            std::shared_lock rdlock(_tablet->get_header_lock());
+            const RowsetSharedPtr rowset = _tablet->rowset_with_max_version();
+            if (rowset == nullptr) {
+                std::stringstream ss;
+                ss << "fail to get latest version of tablet: " << tablet_id;
+                LOG(WARNING) << ss.str();
+                return Status::InternalError(ss.str());
+            }
+
+            // acquire tablet rowset readers at the beginning of the scan node
+            // to prevent this case: when there are lots of olap scanners to 
run for example 10000
+            // the rowsets maybe compacted when the last olap scanner starts
+            Version rd_version(0, _version);
+            Status acquire_reader_st =
+                    _tablet->capture_rs_readers(rd_version, 
&_tablet_reader_params.rs_readers);
+            if (!acquire_reader_st.ok()) {
+                LOG(WARNING) << "fail to init reader.res=" << 
acquire_reader_st;
+                std::stringstream ss;
+                ss << "failed to initialize storage reader. tablet=" << 
_tablet->full_name()
+                   << ", res=" << acquire_reader_st
+                   << ", backend=" << BackendOptions::get_localhost();
+                return Status::InternalError(ss.str());
+            }
+
+            // Initialize tablet_reader_params
+            RETURN_IF_ERROR(_init_tablet_reader_params(key_ranges, filters, 
bloom_filters,
+                                                       function_filters));
+        }
+    }
+
+    return Status::OK();
+}
+
+Status NewOlapScanner::open(RuntimeState* state) {
+    RETURN_IF_ERROR(VScanner::open(state));
+    // SCOPED_TIMER(_parent->_reader_init_timer);
+    // SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
+
+    // _runtime_filter_marks.resize(_parent->runtime_filter_descs().size(), 
false);
+
+    auto res = _tablet_reader->init(_tablet_reader_params);
+    if (!res.ok()) {
+        std::stringstream ss;
+        ss << "failed to initialize storage reader. tablet="
+           << _tablet_reader_params.tablet->full_name() << ", res=" << res
+           << ", backend=" << BackendOptions::get_localhost();
+        return Status::InternalError(ss.str());
+    }
+    return Status::OK();
+}
+
+// it will be called under tablet read lock because capture rs readers need
+Status NewOlapScanner::_init_tablet_reader_params(
+        const std::vector<OlapScanRange*>& key_ranges, const 
std::vector<TCondition>& filters,
+        const std::vector<std::pair<string, 
std::shared_ptr<IBloomFilterFuncBase>>>& bloom_filters,
+        const std::vector<FunctionFilter>& function_filters) {
+    // if the table with rowset [0-x] or [0-1] [2-y], and [0-1] is empty
+    bool single_version =
+            (_tablet_reader_params.rs_readers.size() == 1 &&
+             _tablet_reader_params.rs_readers[0]->rowset()->start_version() == 
0 &&
+             !_tablet_reader_params.rs_readers[0]
+                      ->rowset()
+                      ->rowset_meta()
+                      ->is_segments_overlapping()) ||
+            (_tablet_reader_params.rs_readers.size() == 2 &&
+             
_tablet_reader_params.rs_readers[0]->rowset()->rowset_meta()->num_rows() == 0 &&
+             _tablet_reader_params.rs_readers[1]->rowset()->start_version() == 
2 &&
+             !_tablet_reader_params.rs_readers[1]
+                      ->rowset()
+                      ->rowset_meta()
+                      ->is_segments_overlapping());
+
+    _tablet_reader_params.direct_mode = _aggregation || single_version;
+
+    RETURN_IF_ERROR(_init_return_columns(!_tablet_reader_params.direct_mode));
+
+    _tablet_reader_params.tablet = _tablet;
+    _tablet_reader_params.tablet_schema = _tablet_schema;
+    _tablet_reader_params.reader_type = READER_QUERY;
+    _tablet_reader_params.aggregation = _aggregation;
+    _tablet_reader_params.version = Version(0, _version);
+
+    // Condition
+    for (auto& filter : filters) {
+        _tablet_reader_params.conditions.push_back(filter);
+    }
+    std::copy(bloom_filters.cbegin(), bloom_filters.cend(),
+              std::inserter(_tablet_reader_params.bloom_filters,
+                            _tablet_reader_params.bloom_filters.begin()));
+
+    std::copy(function_filters.cbegin(), function_filters.cend(),
+              std::inserter(_tablet_reader_params.function_filters,
+                            _tablet_reader_params.function_filters.begin()));
+
+    std::copy(_tablet->delete_predicates().cbegin(), 
_tablet->delete_predicates().cend(),
+              std::inserter(_tablet_reader_params.delete_predicates,
+                            _tablet_reader_params.delete_predicates.begin()));
+
+    // Range
+    for (auto key_range : key_ranges) {
+        if (key_range->begin_scan_range.size() == 1 &&
+            key_range->begin_scan_range.get_value(0) == NEGATIVE_INFINITY) {
+            continue;
+        }
+
+        _tablet_reader_params.start_key_include = key_range->begin_include;
+        _tablet_reader_params.end_key_include = key_range->end_include;
+
+        _tablet_reader_params.start_key.push_back(key_range->begin_scan_range);
+        _tablet_reader_params.end_key.push_back(key_range->end_scan_range);
+    }
+
+    _tablet_reader_params.profile = _parent->runtime_profile();
+    _tablet_reader_params.runtime_state = _state;
+
+    _tablet_reader_params.origin_return_columns = &_return_columns;
+    _tablet_reader_params.tablet_columns_convert_to_null_set = 
&_tablet_columns_convert_to_null_set;
+
+    if (_tablet_reader_params.direct_mode) {
+        _tablet_reader_params.return_columns = _return_columns;
+    } else {
+        // we need to fetch all key columns to do the right aggregation on 
storage engine side.
+        for (size_t i = 0; i < _tablet_schema->num_key_columns(); ++i) {
+            _tablet_reader_params.return_columns.push_back(i);
+        }
+        for (auto index : _return_columns) {
+            if (_tablet_schema->column(index).is_key()) {
+                continue;
+            } else {
+                _tablet_reader_params.return_columns.push_back(index);
+            }
+        }
+    }
+
+    // If a agg node is this scan node direct parent
+    // we will not call agg object finalize method in scan node,
+    // to avoid the unnecessary SerDe and improve query performance
+    _tablet_reader_params.need_agg_finalize = _need_agg_finalize;
+
+    if (!config::disable_storage_page_cache) {
+        _tablet_reader_params.use_page_cache = true;
+    }
+
+    if (_tablet->enable_unique_key_merge_on_write()) {
+        _tablet_reader_params.delete_bitmap = 
&_tablet->tablet_meta()->delete_bitmap();
+    }
+
+    TOlapScanNode& olap_scan_node = 
((NewOlapScanNode*)_parent)->_olap_scan_node;
+    if (olap_scan_node.__isset.sort_info && 
olap_scan_node.sort_info.is_asc_order.size() > 0) {
+        _limit = _parent->_limit_per_scanner;
+        _tablet_reader_params.read_orderby_key = true;
+        if (!olap_scan_node.sort_info.is_asc_order[0]) {
+            _tablet_reader_params.read_orderby_key_reverse = true;
+        }
+        _tablet_reader_params.read_orderby_key_num_prefix_columns =
+                olap_scan_node.sort_info.is_asc_order.size();
+    }
+
+    return Status::OK();
+}
+
+Status NewOlapScanner::_init_return_columns(bool need_seq_col) {
+    for (auto slot : _output_tuple_desc->slots()) {
+        if (!slot->is_materialized()) {
+            continue;
+        }
+
+        int32_t index = slot->col_unique_id() >= 0
+                                ? 
_tablet_schema->field_index(slot->col_unique_id())
+                                : 
_tablet_schema->field_index(slot->col_name());
+
+        if (index < 0) {
+            std::stringstream ss;
+            ss << "field name is invalid. field=" << slot->col_name();
+            LOG(WARNING) << ss.str();
+            return Status::InternalError(ss.str());
+        }
+        _return_columns.push_back(index);
+        if (slot->is_nullable() && 
!_tablet_schema->column(index).is_nullable()) {
+            _tablet_columns_convert_to_null_set.emplace(index);
+        }
+    }
+
+    // expand the sequence column
+    if (_tablet_schema->has_sequence_col() && need_seq_col) {
+        bool has_replace_col = false;
+        for (auto col : _return_columns) {
+            if (_tablet_schema->column(col).aggregation() ==
+                FieldAggregationMethod::OLAP_FIELD_AGGREGATION_REPLACE) {
+                has_replace_col = true;
+                break;
+            }
+        }
+        if (auto sequence_col_idx = _tablet_schema->sequence_col_idx();
+            has_replace_col && std::find(_return_columns.begin(), 
_return_columns.end(),
+                                         sequence_col_idx) == 
_return_columns.end()) {
+            _return_columns.push_back(sequence_col_idx);
+        }
+    }
+
+    if (_return_columns.empty()) {
+        return Status::InternalError("failed to build storage scanner, no 
materialized slot!");
+    }
+    return Status::OK();
+}
+
+Status NewOlapScanner::_get_block_impl(RuntimeState* state, Block* block, 
bool* eof) {
+    // Read one block from block reader
+    // ATTN: Here we need to let the _get_block_impl method guarantee the 
semantics of the interface,
+    // that is, eof can be set to true only when the returned block is empty.
+    RETURN_IF_ERROR(_tablet_reader->next_block_with_aggregation(block, 
nullptr, nullptr, eof));
+    if (block->rows() > 0) {
+        *eof = false;
+    }
+    return Status::OK();
+}
+
+Status NewOlapScanner::close(RuntimeState* state) {
+    if (_is_closed) {
+        return Status::OK();
+    }
+
+    // olap scan node will call scanner.close() when finished
+    // will release resources here
+    // if not clear rowset readers in read_params here
+    // readers will be release when runtime state deconstructed but
+    // deconstructor in reader references runtime state
+    // so that it will core
+    _tablet_reader_params.rs_readers.clear();
+    _update_counter();
+    _tablet_reader.reset();
+    // Expr::close(_conjunct_ctxs, state);
+    _is_closed = true;
+    return Status::OK();
+
+    RETURN_IF_ERROR(VScanner::close(state));
+    return Status::OK();
+}
+
+void NewOlapScanner::_update_counter() {
+    // TODO
+}
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/new_olap_scanner.h 
b/be/src/vec/exec/scan/new_olap_scanner.h
new file mode 100644
index 0000000000..73c0330b20
--- /dev/null
+++ b/be/src/vec/exec/scan/new_olap_scanner.h
@@ -0,0 +1,79 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "exec/olap_utils.h"
+#include "exprs/bloomfilter_predicate.h"
+#include "exprs/function_filter.h"
+#include "olap/reader.h"
+#include "vec/exec/scan/vscanner.h"
+
+namespace doris {
+
+struct OlapScanRange;
+
+namespace vectorized {
+
+class NewOlapScanNode;
+
+class NewOlapScanner : public VScanner {
+public:
+    NewOlapScanner(RuntimeState* state, NewOlapScanNode* parent, int64_t 
limit, bool aggregation,
+                   bool need_agg_finalize, const TPaloScanRange& scan_range, 
MemTracker* tracker);
+
+    Status open(RuntimeState* state) override;
+
+    Status close(RuntimeState* state) override;
+
+public:
+    Status prepare(const TPaloScanRange& scan_range, const 
std::vector<OlapScanRange*>& key_ranges,
+                   VExprContext** vconjunct_ctx_ptr, const 
std::vector<TCondition>& filters,
+                   const std::vector<std::pair<string, 
std::shared_ptr<IBloomFilterFuncBase>>>&
+                           bloom_filters,
+                   const std::vector<FunctionFilter>& function_filters);
+
+protected:
+    Status _get_block_impl(RuntimeState* state, Block* block, bool* eos) 
override;
+
+private:
+    void _update_counter();
+
+    Status _init_tablet_reader_params(
+            const std::vector<OlapScanRange*>& key_ranges, const 
std::vector<TCondition>& filters,
+            const std::vector<std::pair<string, 
std::shared_ptr<IBloomFilterFuncBase>>>&
+                    bloom_filters,
+            const std::vector<FunctionFilter>& function_filters);
+
+    Status _init_return_columns(bool need_seq_col);
+
+private:
+    bool _aggregation;
+    bool _need_agg_finalize;
+
+    TabletSchemaSPtr _tablet_schema;
+    TabletSharedPtr _tablet;
+    int64_t _version;
+
+    TabletReader::ReaderParams _tablet_reader_params;
+    std::unique_ptr<TabletReader> _tablet_reader;
+
+    std::vector<uint32_t> _return_columns;
+    std::unordered_set<uint32_t> _tablet_columns_convert_to_null_set;
+};
+} // namespace vectorized
+} // namespace doris
diff --git a/be/src/vec/exec/scan/scanner_context.cpp 
b/be/src/vec/exec/scan/scanner_context.cpp
new file mode 100644
index 0000000000..bef4fe8e66
--- /dev/null
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -0,0 +1,232 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "scanner_context.h"
+
+#include "common/config.h"
+#include "runtime/runtime_state.h"
+#include "util/threadpool.h"
+#include "vec/core/block.h"
+#include "vec/exec/scan/scanner_scheduler.h"
+#include "vec/exec/scan/vscanner.h"
+
+namespace doris::vectorized {
+
+Status ScannerContext::init() {
+    _real_tuple_desc = _input_tuple_desc != nullptr ? _input_tuple_desc : 
_output_tuple_desc;
+    // 1. Calculate how many blocks need to be preallocated.
+    // The calculation logic is as follows:
+    //  1. Assuming that at most M rows can be scanned in one 
scan(config::doris_scanner_row_num),
+    //     then figure out how many blocks are required for one 
scan(_block_per_scanner).
+    //  2. The maximum number of concurrency * the blocks required for one 
scan,
+    //     that is, the number of blocks that need to be pre-allocated
+    auto doris_scanner_row_num =
+            limit == -1 ? config::doris_scanner_row_num
+                        : 
std::min(static_cast<int64_t>(config::doris_scanner_row_num), limit);
+    int real_block_size = limit == -1 ? _state->batch_size()
+                                      : 
std::min(static_cast<int64_t>(_state->batch_size()), limit);
+    _block_per_scanner = (doris_scanner_row_num + (real_block_size - 1)) / 
real_block_size;
+    auto pre_alloc_block_count =
+            std::min((int32_t)_scanners.size(), 
config::doris_scanner_thread_pool_thread_num) *
+            _block_per_scanner;
+
+    // The free blocks is used for final output block of scanners.
+    // So use _output_tuple_desc;
+    for (int i = 0; i < pre_alloc_block_count; ++i) {
+        auto block = new vectorized::Block(_output_tuple_desc->slots(), 
real_block_size);
+        _free_blocks.emplace_back(block);
+    }
+
+    // 2. Calculate max concurrency
+    _max_thread_num = config::doris_scanner_thread_pool_thread_num;
+    if (config::doris_scanner_row_num > _state->batch_size()) {
+        _max_thread_num /= config::doris_scanner_row_num / 
_state->batch_size();
+        if (_max_thread_num <= 0) {
+            _max_thread_num = 1;
+        }
+    }
+
+    // 3. get thread token
+    thread_token = _state->get_query_fragments_ctx()->get_token();
+
+    // 4. This ctx will be submitted to the scanner scheduler right after init.
+    // So set _num_scheduling_ctx to 1 here.
+    _num_scheduling_ctx = 1;
+
+    _num_unfinished_scanners = _scanners.size();
+
+    return Status::OK();
+}
+
+vectorized::Block* ScannerContext::get_free_block(bool* get_free_block) {
+    {
+        std::lock_guard<std::mutex> l(_free_blocks_lock);
+        if (!_free_blocks.empty()) {
+            auto block = _free_blocks.back();
+            _free_blocks.pop_back();
+            return block;
+        }
+    }
+    *get_free_block = false;
+
+    return new vectorized::Block(_real_tuple_desc->slots(), 
_state->batch_size());
+}
+
+void ScannerContext::return_free_block(vectorized::Block* block) {
+    block->clear_column_data();
+    std::lock_guard<std::mutex> l(_free_blocks_lock);
+    _free_blocks.emplace_back(block);
+}
+
+void ScannerContext::append_blocks_to_queue(const 
std::vector<vectorized::Block*>& blocks) {
+    std::lock_guard<std::mutex> l(_transfer_lock);
+    blocks_queue.insert(blocks_queue.end(), blocks.begin(), blocks.end());
+    for (auto b : blocks) {
+        _cur_bytes_in_queue += b->allocated_bytes();
+    }
+    _blocks_queue_added_cv.notify_one();
+}
+
+Status ScannerContext::get_block_from_queue(vectorized::Block** block, bool* 
eos) {
+    std::unique_lock<std::mutex> l(_transfer_lock);
+    // Wait for block from queue
+    while (_process_status.ok() && !_is_finished && blocks_queue.empty()) {
+        _blocks_queue_added_cv.wait_for(l, std::chrono::seconds(1));
+    }
+
+    if (!_process_status.ok()) {
+        return _process_status;
+    }
+
+    if (!blocks_queue.empty()) {
+        *block = blocks_queue.front();
+        blocks_queue.pop_front();
+        _cur_bytes_in_queue -= (*block)->allocated_bytes();
+        return Status::OK();
+    } else {
+        *eos = _is_finished;
+    }
+    return Status::OK();
+}
+
+bool ScannerContext::set_status_on_error(const Status& status) {
+    std::lock_guard<std::mutex> l(_transfer_lock);
+    if (_process_status.ok()) {
+        _process_status = status;
+        _blocks_queue_added_cv.notify_one();
+        return true;
+    }
+    return false;
+}
+
+Status ScannerContext::_close_and_clear_scanners() {
+    std::unique_lock<std::mutex> l(_scanners_lock);
+    for (auto scanner : _scanners) {
+        scanner->close(_state);
+        // Scanners are in ObjPool in ScanNode,
+        // so no need to delete them here.
+    }
+    _scanners.clear();
+    return Status::OK();
+}
+
+void ScannerContext::clear_and_join() {
+    _close_and_clear_scanners();
+
+    std::unique_lock<std::mutex> l(_transfer_lock);
+    do {
+        if (_num_running_scanners == 0 && _num_scheduling_ctx == 0) {
+            break;
+        } else {
+            _ctx_finish_cv.wait(
+                    l, [this] { return _num_running_scanners == 0 && 
_num_scheduling_ctx == 0; });
+            break;
+        }
+    } while (false);
+
+    std::for_each(blocks_queue.begin(), blocks_queue.end(),
+                  std::default_delete<vectorized::Block>());
+    std::for_each(_free_blocks.begin(), _free_blocks.end(),
+                  std::default_delete<vectorized::Block>());
+    return;
+}
+
+std::string ScannerContext::debug_string() {
+    return fmt::format(
+            "id: {}, sacnners: {}, blocks in queue: {},"
+            " status: {}, _should_stop: {}, _is_finished: {}, free blocks: {},"
+            " limit: {}, _num_running_scanners: {}, _num_scheduling_ctx: {}, 
_max_thread_num: {},"
+            " _block_per_scanner: {}, _cur_bytes_in_queue: {}, 
_max_bytes_in_queue: {}",
+            ctx_id, _scanners.size(), blocks_queue.size(), 
_process_status.ok(), _should_stop,
+            _is_finished, _free_blocks.size(), limit, _num_running_scanners, 
_num_scheduling_ctx,
+            _max_thread_num, _block_per_scanner, _cur_bytes_in_queue, 
_max_bytes_in_queue);
+}
+
+void ScannerContext::push_back_scanner_and_reschedule(ScannerScheduler* 
scheduler,
+                                                      VScanner* scanner) {
+    {
+        std::unique_lock<std::mutex> l(_scanners_lock);
+        _scanners.push_front(scanner);
+    }
+
+    std::lock_guard<std::mutex> l(_transfer_lock);
+    _num_running_scanners--;
+    _num_scheduling_ctx++;
+    scheduler->submit(this);
+    if (scanner->need_to_close() && (--_num_unfinished_scanners) == 0) {
+        _is_finished = true;
+        _blocks_queue_added_cv.notify_one();
+    }
+    _ctx_finish_cv.notify_one();
+}
+
+void ScannerContext::get_next_batch_of_scanners(std::list<VScanner*>* 
current_run) {
+    // 1. Calculate how many scanners should be scheduled at this run.
+    int thread_slot_num = 0;
+    {
+        std::unique_lock<std::mutex> l(_transfer_lock);
+        if (_cur_bytes_in_queue < _max_bytes_in_queue / 2) {
+            // If there are enough space in blocks queue,
+            // the scanner number depends on the _free_blocks numbers
+            std::lock_guard<std::mutex> l(_free_blocks_lock);
+            thread_slot_num = _free_blocks.size() / _block_per_scanner;
+            thread_slot_num += (_free_blocks.size() % _block_per_scanner != 0);
+            thread_slot_num = std::min(thread_slot_num, _max_thread_num - 
_num_running_scanners);
+            if (thread_slot_num <= 0) {
+                thread_slot_num = 1;
+            }
+        }
+    }
+
+    // 2. get #thread_slot_num scanners from ctx->scanners
+    // and put them into "this_run".
+    {
+        std::unique_lock<std::mutex> l(_scanners_lock);
+        for (int i = 0; i < thread_slot_num && !_scanners.empty();) {
+            auto scanner = _scanners.front();
+            _scanners.pop_front();
+            if (scanner->need_to_close()) {
+                scanner->close(_state);
+            } else {
+                current_run->push_back(scanner);
+                i++;
+            }
+        }
+    }
+}
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/scanner_context.h 
b/be/src/vec/exec/scan/scanner_context.h
new file mode 100644
index 0000000000..10728bc7df
--- /dev/null
+++ b/be/src/vec/exec/scan/scanner_context.h
@@ -0,0 +1,205 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <condition_variable>
+
+#include "common/status.h"
+#include "runtime/descriptors.h"
+#include "util/uid_util.h"
+#include "vec/core/block.h"
+
+namespace doris {
+
+class PriorityThreadPool;
+class ThreadPool;
+class ThreadPoolToken;
+class ScannerScheduler;
+
+namespace vectorized {
+
+class VScanner;
+
+// ScannerContext is responsible for recording the execution status
+// of a group of Scanners corresponding to a ScanNode.
+// Including how many scanners are being scheduled, and maintaining
+// a producer-consumer blocks queue between scanners and scan nodes.
+//
+// ScannerContext is also the scheduling unit of ScannerScheduler.
+// ScannerScheduler schedules a ScannerContext at a time,
+// and submits the Scanners to the scanner thread pool for data scanning.
+class ScannerContext {
+public:
+    ScannerContext(RuntimeState* state_, const TupleDescriptor* 
input_tuple_desc,
+                   const TupleDescriptor* output_tuple_desc, const 
std::list<VScanner*>& scanners_,
+                   int64_t limit_, int64_t max_bytes_in_blocks_queue_)
+            : _state(state_),
+              _input_tuple_desc(input_tuple_desc),
+              _output_tuple_desc(output_tuple_desc),
+              _process_status(Status::OK()),
+              limit(limit_),
+              _max_bytes_in_queue(max_bytes_in_blocks_queue_),
+              _scanners(scanners_) {
+        ctx_id = UniqueId::gen_uid().to_string();
+        if (_scanners.empty()) {
+            _is_finished = true;
+        }
+    }
+
+    Status init();
+
+    vectorized::Block* get_free_block(bool* get_free_block);
+    void return_free_block(vectorized::Block* block);
+
+    // Append blocks from scanners to the blocks queue.
+    void append_blocks_to_queue(const std::vector<vectorized::Block*>& blocks);
+
+    // Get next block from blocks queue. Called by ScanNode
+    // Set eos to true if there is no more data to read.
+    // And if eos is true, the block returned must be nullptr.
+    Status get_block_from_queue(vectorized::Block** block, bool* eos);
+
+    // When a scanner complete a scan, this method will be called
+    // to return the scanner to the list for next scheduling.
+    void push_back_scanner_and_reschedule(ScannerScheduler* scheduler, 
VScanner* scanner);
+
+    bool set_status_on_error(const Status& status);
+
+    Status status() {
+        std::lock_guard<std::mutex> l(_transfer_lock);
+        return _process_status;
+    }
+
+    // Called by ScanNode.
+    // Used to notify the scheduler that this ScannerContext can stop working.
+    void set_should_stop() {
+        std::lock_guard<std::mutex> l(_transfer_lock);
+        _should_stop = true;
+        _blocks_queue_added_cv.notify_one();
+    }
+
+    // Return true if this ScannerContext need no more process
+    bool done() {
+        std::lock_guard<std::mutex> l(_transfer_lock);
+        return _is_finished || _should_stop || !_process_status.ok();
+    }
+
+    // Update the running num of scanners and contexts
+    void update_num_running(int32_t scanner_inc, int32_t sched_inc) {
+        std::lock_guard<std::mutex> l(_transfer_lock);
+        _num_running_scanners += scanner_inc;
+        _num_scheduling_ctx += sched_inc;
+        _blocks_queue_added_cv.notify_one();
+        _ctx_finish_cv.notify_one();
+    }
+
+    void get_next_batch_of_scanners(std::list<VScanner*>* current_run);
+
+    void clear_and_join();
+
+    std::string debug_string();
+
+    RuntimeState* state() { return _state; }
+
+public:
+    // the unique id of this context
+    std::string ctx_id;
+    int32_t queue_idx = -1;
+    ThreadPoolToken* thread_token;
+
+private:
+    Status _close_and_clear_scanners();
+
+private:
+    RuntimeState* _state;
+
+    // the comment of same fields in VScanNode
+    const TupleDescriptor* _input_tuple_desc;
+    const TupleDescriptor* _output_tuple_desc;
+    // If _input_tuple_desc is not null, _real_tuple_desc point to 
_input_tuple_desc,
+    // otherwise, _real_tuple_desc point to _output_tuple_desc
+    const TupleDescriptor* _real_tuple_desc;
+
+    // _transfer_lock is used to protect the critical section
+    // where the ScanNode and ScannerScheduler interact.
+    // Including access to variables such as blocks_queue, _process_status, 
_is_finished, etc.
+    std::mutex _transfer_lock;
+    // The blocks got from scanners will be added to the "blocks_queue".
+    // And the upper scan node will be as a consumer to fetch blocks from this 
queue.
+    // Should be protected by "_transfer_lock"
+    std::list<vectorized::Block*> blocks_queue;
+    // Wait in get_block_from_queue(), by ScanNode.
+    std::condition_variable _blocks_queue_added_cv;
+    // Wait in clear_and_join(), by ScanNode.
+    std::condition_variable _ctx_finish_cv;
+
+    // The following 3 variables control the process of the scanner scheduling.
+    // Use _transfer_lock to protect them.
+    // 1. _process_status
+    //      indicates the global status of this scanner context.
+    //      Set to non-ok if encounter errors.
+    //      And if it is non-ok, the scanner process should stop.
+    //      Set be set by either ScanNode or ScannerScheduler.
+    // 2. _should_stop
+    //      Always be set by ScanNode.
+    //      True means no more data need to be read(reach limit or closed)
+    // 3. _is_finished
+    //      Always be set by ScannerScheduler.
+    //      True means all scanners are finished to scan.
+    Status _process_status;
+    bool _should_stop = false;
+    bool _is_finished = false;
+
+    // Pre-allocated blocks for all scanners to share, for memory reuse.
+    std::mutex _free_blocks_lock;
+    std::vector<vectorized::Block*> _free_blocks;
+
+    // The limit from SQL's limit clause
+    int64_t limit;
+
+    // Current number of running scanners.
+    int32_t _num_running_scanners = 0;
+    // Current number of ctx being scheduled.
+    // After each Scanner finishes a task, it will put the corresponding ctx
+    // back into the scheduling queue.
+    // Therefore, there will be multiple pointer of same ctx in the scheduling 
queue.
+    // Here we record the number of ctx in the scheduling  queue to clean up 
at the end.
+    int32_t _num_scheduling_ctx = 0;
+    // Num of unfinished scanners. Should be set in init()
+    int32_t _num_unfinished_scanners = 0;
+    // Max number of scan thread for this scanner context.
+    int32_t _max_thread_num = 0;
+    // How many blocks a scanner can use in one task.
+    int32_t _block_per_scanner = 0;
+
+    // The current bytes of blocks in blocks queue
+    int64_t _cur_bytes_in_queue = 0;
+    // The max limit bytes of blocks in blocks queue
+    int64_t _max_bytes_in_queue;
+
+    // List "scanners" saves all "unfinished" scanners.
+    // The scanner scheduler will pop scanners from this list, run scanner,
+    // and then if the scanner is not finished, will be pushed back to this 
list.
+    // Not need to protect by lock, because only one scheduler thread will 
access to it.
+    std::mutex _scanners_lock;
+    std::list<VScanner*> _scanners;
+
+    // TODO: Add statistics of this scanner
+};
+} // namespace vectorized
+} // namespace doris
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp 
b/be/src/vec/exec/scan/scanner_scheduler.cpp
new file mode 100644
index 0000000000..7ddd5e304b
--- /dev/null
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -0,0 +1,253 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "scanner_scheduler.h"
+
+#include "common/config.h"
+#include "util/priority_thread_pool.hpp"
+#include "util/priority_work_stealing_thread_pool.hpp"
+#include "util/thread.h"
+#include "util/threadpool.h"
+#include "vec/core/block.h"
+#include "vec/exec/scan/vscanner.h"
+#include "vec/exprs/vexpr.h"
+
+namespace doris::vectorized {
+
+ScannerScheduler::ScannerScheduler() {}
+
+ScannerScheduler::~ScannerScheduler() {
+    _is_closed = true;
+    _scheduler_pool->shutdown();
+    _local_scan_thread_pool->shutdown();
+    _remote_scan_thread_pool->shutdown();
+    // TODO: safely delete all objects and graceful exit
+}
+
+Status ScannerScheduler::init(ExecEnv* env) {
+    // 1. scheduling thread pool and scheduling queues
+    ThreadPoolBuilder("SchedulingThreadPool")
+            .set_min_threads(QUEUE_NUM)
+            .set_max_threads(QUEUE_NUM)
+            .build(&_scheduler_pool);
+
+    _pending_queues = new BlockingQueue<ScannerContext*>*[QUEUE_NUM];
+    for (int i = 0; i < QUEUE_NUM; i++) {
+        _pending_queues[i] = new BlockingQueue<ScannerContext*>(INT32_MAX);
+        _scheduler_pool->submit_func([this, i] { this->_schedule_thread(i); });
+    }
+
+    // 2. local scan thread pool
+    _local_scan_thread_pool = new PriorityWorkStealingThreadPool(
+            config::doris_scanner_thread_pool_thread_num, 
env->store_paths().size(),
+            config::doris_scanner_thread_pool_queue_size);
+
+    // 3. remote scan thread pool
+    _remote_scan_thread_pool = new 
PriorityThreadPool(config::doris_scanner_thread_pool_thread_num,
+                                                      
config::doris_scanner_thread_pool_queue_size);
+
+    return Status::OK();
+}
+
+Status ScannerScheduler::submit(ScannerContext* ctx) {
+    if (ctx->queue_idx == -1) {
+        ctx->queue_idx = (_queue_idx++ % QUEUE_NUM);
+    }
+    if (!_pending_queues[ctx->queue_idx]->blocking_put(ctx)) {
+        return Status::InternalError("failed to submit scanner context to 
scheduler");
+    }
+    return Status::OK();
+}
+
+void ScannerScheduler::_schedule_thread(int queue_id) {
+    BlockingQueue<ScannerContext*>* queue = _pending_queues[queue_id];
+    while (!_is_closed) {
+        ScannerContext* ctx;
+        bool ok = queue->blocking_get(&ctx);
+        if (!ok) {
+            // maybe closed
+            continue;
+        }
+
+        _schedule_scanners(ctx);
+        // If ctx is done, no need to schedule it again.
+        // But should notice that there may still scanners running in scanner 
pool.
+    }
+    return;
+}
+
+void ScannerScheduler::_schedule_scanners(ScannerContext* ctx) {
+    if (ctx->done()) {
+        ctx->update_num_running(0, -1);
+        return;
+    }
+
+    std::list<VScanner*> this_run;
+    ctx->get_next_batch_of_scanners(&this_run);
+    if (this_run.empty()) {
+        submit(ctx);
+        return;
+    }
+
+    ctx->update_num_running(this_run.size(), -1);
+    // Submit scanners to thread pool
+    // TODO(cmy): How to handle this "nice"?
+    int nice = 1;
+    auto cur_span = opentelemetry::trace::Tracer::GetCurrentSpan();
+    auto iter = this_run.begin();
+    if (ctx->thread_token != nullptr) {
+        while (iter != this_run.end()) {
+            auto s = ctx->thread_token->submit_func(
+                    [this, scanner = *iter, parent_span = cur_span, ctx] {
+                        opentelemetry::trace::Scope scope {parent_span};
+                        this->_scanner_scan(this, ctx, scanner);
+                    });
+            if (s.ok()) {
+                (*iter)->start_wait_worker_timer();
+                this_run.erase(iter++);
+            } else {
+                ctx->set_status_on_error(s);
+                break;
+            }
+        }
+    } else {
+        while (iter != this_run.end()) {
+            PriorityThreadPool::Task task;
+            task.work_function = [this, scanner = *iter, parent_span = 
cur_span, ctx] {
+                opentelemetry::trace::Scope scope {parent_span};
+                this->_scanner_scan(this, ctx, scanner);
+            };
+            task.priority = nice;
+            task.queue_id = (*iter)->queue_id();
+            (*iter)->start_wait_worker_timer();
+
+            TabletStorageType type = (*iter)->get_storage_type();
+            bool ret = false;
+            if (type == TabletStorageType::STORAGE_TYPE_LOCAL) {
+                ret = _local_scan_thread_pool->offer(task);
+            } else {
+                ret = _remote_scan_thread_pool->offer(task);
+            }
+            if (ret) {
+                this_run.erase(iter++);
+            } else {
+                ctx->set_status_on_error(
+                        Status::InternalError("failed to submit scanner to 
scanner pool"));
+                break;
+            }
+        }
+    }
+}
+
+void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, 
ScannerContext* ctx,
+                                     VScanner* scanner) {
+    // TODO: rethink mem tracker and span
+    // START_AND_SCOPE_SPAN(scanner->runtime_state()->get_tracer(), span,
+    //                   "ScannerScheduler::_scanner_scan");
+    // SCOPED_ATTACH_TASK(scanner->runtime_state());
+
+    Thread::set_self_name("_scanner_scan");
+    // int64_t wait_time = scanner->update_wait_worker_timer();
+    // Do not use ScopedTimer. There is no guarantee that, the counter
+    // (_scan_cpu_timer, the class member) is not destroyed after 
`_running_thread==0`.
+    ThreadCpuStopWatch cpu_watch;
+    cpu_watch.start();
+    Status status = Status::OK();
+    bool eos = false;
+    RuntimeState* state = ctx->state();
+    DCHECK(nullptr != state);
+    if (!scanner->is_open()) {
+        status = scanner->open(state);
+        if (!status.ok()) {
+            ctx->set_status_on_error(status);
+            eos = true;
+        }
+        scanner->set_opened();
+    }
+
+    scanner->try_append_late_arrival_runtime_filter();
+
+    // Because we use thread pool to scan data from storage. One scanner can't
+    // use this thread too long, this can starve other query's scanner. So, we
+    // need yield this thread when we do enough work. However, OlapStorage read
+    // data in pre-aggregate mode, then we can't use storage returned data to
+    // judge if we need to yield. So we record all raw data read in this round
+    // scan, if this exceeds row number or bytes threshold, we yield this 
thread.
+    std::vector<vectorized::Block*> blocks;
+    int64_t raw_rows_read = scanner->raw_rows_read();
+    int64_t raw_rows_threshold = raw_rows_read + config::doris_scanner_row_num;
+    int64_t raw_bytes_read = 0;
+    int64_t raw_bytes_threshold = config::doris_scanner_row_bytes;
+    bool get_free_block = true;
+    int num_rows_in_block = 0;
+
+    // Has to wait at least one full block, or it will cause a lot of schedule 
task in priority
+    // queue, it will affect query latency and query concurrency for example 
ssb 3.3.
+    while (!eos && raw_bytes_read < raw_bytes_threshold &&
+           ((raw_rows_read < raw_rows_threshold && get_free_block) ||
+            num_rows_in_block < state->batch_size())) {
+        if (UNLIKELY(ctx->done())) {
+            eos = true;
+            status = Status::Cancelled("Cancelled");
+            LOG(INFO) << "Scan thread cancelled, cause query done, maybe reach 
limit.";
+            break;
+        }
+
+        auto block = ctx->get_free_block(&get_free_block);
+        status = scanner->get_block(state, block, &eos);
+        VLOG_ROW << "VOlapScanNode input rows: " << block->rows();
+        if (!status.ok()) {
+            LOG(WARNING) << "Scan thread read VOlapScanner failed: " << 
status.to_string();
+            // Add block ptr in blocks, prevent mem leak in read failed
+            blocks.push_back(block);
+            eos = true;
+            break;
+        }
+
+        raw_bytes_read += block->bytes();
+        num_rows_in_block += block->rows();
+        if (UNLIKELY(block->rows() == 0)) {
+            ctx->return_free_block(block);
+        } else {
+            if (!blocks.empty() && blocks.back()->rows() + block->rows() <= 
state->batch_size()) {
+                vectorized::MutableBlock(blocks.back()).merge(*block);
+                ctx->return_free_block(block);
+            } else {
+                blocks.push_back(block);
+            }
+        }
+        raw_rows_read = scanner->raw_rows_read();
+    } // end for while
+
+    // if we failed, check status.
+    if (UNLIKELY(!status.ok())) {
+        // _transfer_done = true;
+        ctx->set_status_on_error(status);
+        eos = true;
+        std::for_each(blocks.begin(), blocks.end(), 
std::default_delete<vectorized::Block>());
+    } else if (!blocks.empty()) {
+        ctx->append_blocks_to_queue(blocks);
+    }
+
+    if (eos) {
+        scanner->mark_to_need_to_close();
+    }
+
+    ctx->push_back_scanner_and_reschedule(scheduler, scanner);
+}
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/scanner_scheduler.h 
b/be/src/vec/exec/scan/scanner_scheduler.h
new file mode 100644
index 0000000000..a72fd5021e
--- /dev/null
+++ b/be/src/vec/exec/scan/scanner_scheduler.h
@@ -0,0 +1,94 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "common/status.h"
+#include "util/blocking_queue.hpp"
+#include "vec/exec/scan/scanner_context.h"
+
+namespace doris::vectorized {
+
+// Responsible for the scheduling and execution of all Scanners of a BE node.
+// ScannerScheduler has two types of thread pools:
+// 1. Scheduling thread pool
+//     Responsible for Scanner scheduling.
+//     A set of Scanners for a query will be encapsulated into a ScannerContext
+//     and submitted to the ScannerScheduler's scheduling queue.
+//     There are multiple scheduling queues in ScannerScheduler, and each 
scheduling queue
+//     is handled by a scheduling thread.
+//     The scheduling thread is scheduled in granularity of ScannerContext,
+//     that is, a group of Scanners in a ScannerContext are scheduled at a 
time.
+//
+//2. Execution thread pool
+//     The scheduling thread will submit the Scanners selected from the 
ScannerContext
+//     to the execution thread pool to do the actual scan task.
+//     Each Scanner will act as a producer, read a group of blocks and put 
them into
+//     the corresponding block queue.
+//     The corresponding ScanNode will act as a consumer to consume blocks 
from the block queue.
+
+using ContextMap = phmap::parallel_flat_hash_map<
+        std::string, std::shared_ptr<ScannerContext>, 
phmap::priv::hash_default_hash<std::string>,
+        phmap::priv::hash_default_eq<std::string>,
+        std::allocator<std::pair<const std::string, 
std::shared_ptr<ScannerContext>>>, 12,
+        std::mutex>;
+
+class Env;
+class ScannerScheduler {
+public:
+    ScannerScheduler();
+    ~ScannerScheduler();
+
+    Status init(ExecEnv* env);
+
+    Status submit(ScannerContext* ctx);
+
+private:
+    // scheduling thread function
+    void _schedule_thread(int queue_id);
+    // schedule scanners in a certain ScannerContext
+    void _schedule_scanners(ScannerContext* ctx);
+    // execution thread function
+    void _scanner_scan(ScannerScheduler* scheduler, ScannerContext* ctx, 
VScanner* scanner);
+
+private:
+    // Scheduling queue number.
+    // TODO: make it configurable.
+    static const int QUEUE_NUM = 4;
+    // The ScannerContext will be submitted to the pending queue roundrobin.
+    // _queue_idx pointer to the current queue.
+    // The scheduler thread will take ctx from pending queue, schedule it,
+    // and put it to the _scheduling_map.
+    // If any scanner finish, it will take ctx from and put it to pending 
queue again.
+    std::atomic_int _queue_idx = {0};
+    BlockingQueue<ScannerContext*>** _pending_queues;
+
+    // scheduling thread pool
+    std::unique_ptr<ThreadPool> _scheduler_pool;
+    // execution thread pool
+    // _local_scan_thread_pool is for local scan task(typically, olap scanner)
+    // _remote_scan_thread_pool is for remote scan task(cold data on s3, hdfs, 
etc.)
+    PriorityThreadPool* _local_scan_thread_pool;
+    PriorityThreadPool* _remote_scan_thread_pool;
+
+    // true is the scheduler is closed.
+    std::atomic_bool _is_closed = {false};
+
+    ContextMap _context_map;
+};
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/vscan_node.cpp 
b/be/src/vec/exec/scan/vscan_node.cpp
new file mode 100644
index 0000000000..26f8847c1d
--- /dev/null
+++ b/be/src/vec/exec/scan/vscan_node.cpp
@@ -0,0 +1,864 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exec/scan/vscan_node.h"
+
+#include "exprs/hybrid_set.h"
+#include "runtime/runtime_filter_mgr.h"
+#include "util/stack_util.h"
+#include "util/threadpool.h"
+#include "vec/columns/column_const.h"
+#include "vec/exec/scan/scanner_scheduler.h"
+#include "vec/exec/scan/vscanner.h"
+#include "vec/exprs/vcompound_pred.h"
+#include "vec/exprs/vslot_ref.h"
+#include "vec/functions/in.h"
+
+namespace doris::vectorized {
+
+#define RETURN_IF_PUSH_DOWN(stmt) \
+    if (!push_down) {             \
+        stmt;                     \
+    } else {                      \
+        return;                   \
+    }
+
+static bool ignore_cast(SlotDescriptor* slot, VExpr* expr) {
+    if ((slot->type().is_date_type() || slot->type().is_date_v2_type() ||
+         slot->type().is_datetime_v2_type()) &&
+        (expr->type().is_date_type() || expr->type().is_date_v2_type() ||
+         expr->type().is_datetime_v2_type())) {
+        return true;
+    }
+    if (slot->type().is_string_type() && expr->type().is_string_type()) {
+        return true;
+    }
+    return false;
+}
+
+Status VScanNode::init(const TPlanNode& tnode, RuntimeState* state) {
+    RETURN_IF_ERROR(ExecNode::init(tnode, state));
+    _state = state;
+
+    const TQueryOptions& query_options = state->query_options();
+    if (query_options.__isset.max_scan_key_num) {
+        _max_scan_key_num = query_options.max_scan_key_num;
+    } else {
+        _max_scan_key_num = config::doris_max_scan_key_num;
+    }
+    if (query_options.__isset.max_pushdown_conditions_per_column) {
+        _max_pushdown_conditions_per_column = 
query_options.max_pushdown_conditions_per_column;
+    } else {
+        _max_pushdown_conditions_per_column = 
config::max_pushdown_conditions_per_column;
+    }
+
+    RETURN_IF_ERROR(_register_runtime_filter());
+
+    return Status::OK();
+}
+
+Status VScanNode::prepare(RuntimeState* state) {
+    RETURN_IF_ERROR(ExecNode::prepare(state));
+    SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
+
+    RETURN_IF_ERROR(_init_profile());
+
+    _input_tuple_desc = 
state->desc_tbl().get_tuple_descriptor(_input_tuple_id);
+    _output_tuple_desc = 
state->desc_tbl().get_tuple_descriptor(_output_tuple_id);
+
+    // init profile for runtime filter
+    for (auto& rf_ctx : _runtime_filter_ctxs) {
+        rf_ctx.runtime_filter->init_profile(_runtime_profile.get());
+    }
+    return Status::OK();
+}
+
+Status VScanNode::open(RuntimeState* state) {
+    START_AND_SCOPE_SPAN(state->get_tracer(), span, "VScanNode::open");
+    SCOPED_TIMER(_runtime_profile->total_time_counter());
+    RETURN_IF_CANCELLED(state);
+    RETURN_IF_ERROR(ExecNode::open(state));
+    SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
+
+    RETURN_IF_ERROR(_acquire_runtime_filter());
+    RETURN_IF_ERROR(_process_conjuncts());
+
+    std::list<VScanner*> scanners;
+    RETURN_IF_ERROR(_init_scanners(&scanners));
+    if (scanners.empty()) {
+        _eos = true;
+    } else {
+        RETURN_IF_ERROR(_start_scanners(scanners));
+    }
+    return Status::OK();
+}
+
+Status VScanNode::get_next(RuntimeState* state, vectorized::Block* block, 
bool* eos) {
+    SCOPED_TIMER(_runtime_profile->total_time_counter());
+    if (state->is_cancelled()) {
+        _scanner_ctx->set_status_on_error(Status::Cancelled("query 
cancelled"));
+        return _scanner_ctx->status();
+    }
+
+    if (_eos) {
+        *eos = true;
+        return Status::OK();
+    }
+
+    vectorized::Block* scan_block = nullptr;
+    RETURN_IF_ERROR(_scanner_ctx->get_block_from_queue(&scan_block, eos));
+    if (*eos) {
+        DCHECK(scan_block == nullptr);
+        return Status::OK();
+    }
+
+    // get scanner's block memory
+    block->swap(*scan_block);
+    _scanner_ctx->return_free_block(scan_block);
+
+    reached_limit(block, eos);
+    if (*eos) {
+        // reach limit, stop the scanners.
+        _scanner_ctx->set_should_stop();
+    }
+
+    return Status::OK();
+}
+
+Status VScanNode::_start_scanners(const std::list<VScanner*>& scanners) {
+    _scanner_ctx.reset(new ScannerContext(_state, _input_tuple_desc, 
_output_tuple_desc, scanners,
+                                          limit(), 
_state->query_options().mem_limit / 20));
+    RETURN_IF_ERROR(_scanner_ctx->init());
+    
RETURN_IF_ERROR(_state->exec_env()->scanner_scheduler()->submit(_scanner_ctx.get()));
+    return Status::OK();
+}
+
+Status VScanNode::_register_runtime_filter() {
+    int filter_size = _runtime_filter_descs.size();
+    _runtime_filter_ctxs.resize(filter_size);
+    _runtime_filter_ready_flag.resize(filter_size);
+    for (int i = 0; i < filter_size; ++i) {
+        IRuntimeFilter* runtime_filter = nullptr;
+        const auto& filter_desc = _runtime_filter_descs[i];
+        RETURN_IF_ERROR(_state->runtime_filter_mgr()->regist_filter(
+                RuntimeFilterRole::CONSUMER, filter_desc, 
_state->query_options(), id()));
+        
RETURN_IF_ERROR(_state->runtime_filter_mgr()->get_consume_filter(filter_desc.filter_id,
+                                                                         
&runtime_filter));
+        _runtime_filter_ctxs[i].runtime_filter = runtime_filter;
+        _runtime_filter_ready_flag[i] = false;
+    }
+    return Status::OK();
+}
+
+Status VScanNode::_acquire_runtime_filter() {
+    std::vector<VExpr*> vexprs;
+    for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
+        IRuntimeFilter* runtime_filter = 
_runtime_filter_ctxs[i].runtime_filter;
+        bool ready = runtime_filter->is_ready();
+        if (!ready) {
+            ready = runtime_filter->await();
+        }
+        if (ready) {
+            RETURN_IF_ERROR(runtime_filter->get_push_expr_ctxs(&vexprs));
+            _runtime_filter_ctxs[i].apply_mark = true;
+        } else {
+            _is_all_rf_applied = false;
+        }
+    }
+    RETURN_IF_ERROR(_append_rf_into_conjuncts(vexprs));
+
+    return Status::OK();
+}
+
+Status VScanNode::_append_rf_into_conjuncts(std::vector<VExpr*>& vexprs) {
+    if (vexprs.empty()) {
+        return Status::OK();
+    }
+
+    auto last_expr = _vconjunct_ctx_ptr ? (*_vconjunct_ctx_ptr)->root() : 
vexprs[0];
+    for (size_t j = _vconjunct_ctx_ptr ? 0 : 1; j < vexprs.size(); j++) {
+        if (_rf_vexpr_set.find(vexprs[j]) != _rf_vexpr_set.end()) {
+            continue;
+        }
+        TFunction fn;
+        TFunctionName fn_name;
+        fn_name.__set_db_name("");
+        fn_name.__set_function_name("and");
+        fn.__set_name(fn_name);
+        fn.__set_binary_type(TFunctionBinaryType::BUILTIN);
+        std::vector<TTypeDesc> arg_types;
+        arg_types.push_back(create_type_desc(PrimitiveType::TYPE_BOOLEAN));
+        arg_types.push_back(create_type_desc(PrimitiveType::TYPE_BOOLEAN));
+        fn.__set_arg_types(arg_types);
+        fn.__set_ret_type(create_type_desc(PrimitiveType::TYPE_BOOLEAN));
+        fn.__set_has_var_args(false);
+
+        TExprNode texpr_node;
+        texpr_node.__set_type(create_type_desc(PrimitiveType::TYPE_BOOLEAN));
+        texpr_node.__set_node_type(TExprNodeType::COMPOUND_PRED);
+        texpr_node.__set_opcode(TExprOpcode::COMPOUND_AND);
+        texpr_node.__set_fn(fn);
+        texpr_node.__set_is_nullable(last_expr->is_nullable() || 
vexprs[j]->is_nullable());
+        VExpr* new_node = _pool->add(new VcompoundPred(texpr_node));
+        new_node->add_child(last_expr);
+        DCHECK((vexprs[j])->get_impl() != nullptr);
+        new_node->add_child(vexprs[j]);
+        last_expr = new_node;
+        _rf_vexpr_set.insert(vexprs[j]);
+    }
+    auto new_vconjunct_ctx_ptr = _pool->add(new VExprContext(last_expr));
+    if (_vconjunct_ctx_ptr) {
+        (*_vconjunct_ctx_ptr)->clone_fn_contexts(new_vconjunct_ctx_ptr);
+    }
+    RETURN_IF_ERROR(new_vconjunct_ctx_ptr->prepare(_state, row_desc()));
+    RETURN_IF_ERROR(new_vconjunct_ctx_ptr->open(_state));
+    if (_vconjunct_ctx_ptr) {
+        (*(_vconjunct_ctx_ptr.get()))->mark_as_stale();
+        _stale_vexpr_ctxs.push_back(std::move(_vconjunct_ctx_ptr));
+    }
+    _vconjunct_ctx_ptr.reset(new doris::vectorized::VExprContext*);
+    *(_vconjunct_ctx_ptr.get()) = new_vconjunct_ctx_ptr;
+    return Status::OK();
+}
+
+Status VScanNode::close(RuntimeState* state) {
+    if (is_closed()) {
+        return Status::OK();
+    }
+    START_AND_SCOPE_SPAN(state->get_tracer(), span, "VScanNode::close");
+    if (_scanner_ctx.get()) {
+        // stop and wait the scanner scheduler to be done
+        // _scanner_ctx may not be created for some short circuit case.
+        _scanner_ctx->set_should_stop();
+        _scanner_ctx->clear_and_join();
+    }
+
+    for (auto& ctx : _runtime_filter_ctxs) {
+        IRuntimeFilter* runtime_filter = ctx.runtime_filter;
+        runtime_filter->consumer_close();
+    }
+
+    for (auto& ctx : _stale_vexpr_ctxs) {
+        (*ctx)->close(state);
+    }
+
+    RETURN_IF_ERROR(ExecNode::close(state));
+    return Status::OK();
+}
+
+Status VScanNode::_normalize_conjuncts() {
+    // The conjuncts is always on output tuple, so use _output_tuple_desc;
+    std::vector<SlotDescriptor*> slots = _output_tuple_desc->slots();
+
+    for (int slot_idx = 0; slot_idx < slots.size(); ++slot_idx) {
+        switch (slots[slot_idx]->type().type) {
+#define M(NAME)                                                                
              \
+    case TYPE_##NAME: {                                                        
              \
+        ColumnValueRange<TYPE_##NAME> range(slots[slot_idx]->col_name(),       
              \
+                                            slots[slot_idx]->type().precision, 
              \
+                                            slots[slot_idx]->type().scale);    
              \
+        _slot_id_to_value_range[slots[slot_idx]->id()] = std::pair 
{slots[slot_idx], range}; \
+        break;                                                                 
              \
+    }
+#define APPLY_FOR_PRIMITIVE_TYPE(M) \
+    M(TINYINT)                      \
+    M(SMALLINT)                     \
+    M(INT)                          \
+    M(BIGINT)                       \
+    M(LARGEINT)                     \
+    M(CHAR)                         \
+    M(DATE)                         \
+    M(DATETIME)                     \
+    M(DATEV2)                       \
+    M(DATETIMEV2)                   \
+    M(VARCHAR)                      \
+    M(STRING)                       \
+    M(HLL)                          \
+    M(DECIMAL32)                    \
+    M(DECIMAL64)                    \
+    M(DECIMAL128)                   \
+    M(DECIMALV2)                    \
+    M(BOOLEAN)
+            APPLY_FOR_PRIMITIVE_TYPE(M)
+#undef M
+        default: {
+            VLOG_CRITICAL << "Unsupported Normalize Slot [ColName=" << 
slots[slot_idx]->col_name()
+                          << "]";
+            break;
+        }
+        }
+    }
+    if (_vconjunct_ctx_ptr) {
+        if ((*_vconjunct_ctx_ptr)->root()) {
+            VExpr* new_root = 
_normalize_predicate((*_vconjunct_ctx_ptr)->root());
+            if (new_root) {
+                (*_vconjunct_ctx_ptr)->set_root(new_root);
+            } else {
+                (*(_vconjunct_ctx_ptr.get()))->mark_as_stale();
+                _stale_vexpr_ctxs.push_back(std::move(_vconjunct_ctx_ptr));
+                _vconjunct_ctx_ptr.reset(nullptr);
+            }
+        }
+    }
+    for (auto& it : _slot_id_to_value_range) {
+        std::visit(
+                [&](auto&& range) {
+                    if (range.is_empty_value_range()) {
+                        _eos = true;
+                    }
+                },
+                it.second.second);
+        _colname_to_value_range[it.second.first->col_name()] = 
it.second.second;
+    }
+
+    return Status::OK();
+}
+
+VExpr* VScanNode::_normalize_predicate(VExpr* conjunct_expr_root) {
+    static constexpr auto is_leaf = [](VExpr* expr) { return 
!expr->is_and_expr(); };
+    auto in_predicate_checker = [](const std::vector<VExpr*>& children, const 
VSlotRef** slot,
+                                   VExpr** child_contains_slot) {
+        if (children.empty() ||
+            VExpr::expr_without_cast(children[0])->node_type() != 
TExprNodeType::SLOT_REF) {
+            // not a slot ref(column)
+            return false;
+        }
+        *slot = reinterpret_cast<const 
VSlotRef*>(VExpr::expr_without_cast(children[0]));
+        *child_contains_slot = children[0];
+        return true;
+    };
+    auto eq_predicate_checker = [](const std::vector<VExpr*>& children, const 
VSlotRef** slot,
+                                   VExpr** child_contains_slot) {
+        for (const VExpr* child : children) {
+            if (VExpr::expr_without_cast(child)->node_type() != 
TExprNodeType::SLOT_REF) {
+                // not a slot ref(column)
+                continue;
+            }
+            *slot = reinterpret_cast<const 
VSlotRef*>(VExpr::expr_without_cast(child));
+            *child_contains_slot = const_cast<VExpr*>(child);
+            return true;
+        }
+        return false;
+    };
+
+    if (conjunct_expr_root != nullptr) {
+        if (is_leaf(conjunct_expr_root)) {
+            auto impl = conjunct_expr_root->get_impl();
+            VExpr* cur_expr = impl ? const_cast<VExpr*>(impl) : 
conjunct_expr_root;
+            SlotDescriptor* slot;
+            ColumnValueRangeType* range = nullptr;
+            bool push_down = false;
+            _eval_const_conjuncts(cur_expr, *(_vconjunct_ctx_ptr.get()), 
&push_down);
+            if (!push_down &&
+                (_is_predicate_acting_on_slot(cur_expr, in_predicate_checker, 
&slot, &range) ||
+                 _is_predicate_acting_on_slot(cur_expr, eq_predicate_checker, 
&slot, &range))) {
+                std::visit(
+                        [&](auto& value_range) {
+                            RETURN_IF_PUSH_DOWN(_normalize_in_and_eq_predicate(
+                                    cur_expr, *(_vconjunct_ctx_ptr.get()), 
slot, value_range,
+                                    &push_down));
+                            
RETURN_IF_PUSH_DOWN(_normalize_not_in_and_not_eq_predicate(
+                                    cur_expr, *(_vconjunct_ctx_ptr.get()), 
slot, value_range,
+                                    &push_down));
+                            RETURN_IF_PUSH_DOWN(_normalize_is_null_predicate(
+                                    cur_expr, *(_vconjunct_ctx_ptr.get()), 
slot, value_range,
+                                    &push_down));
+                            
RETURN_IF_PUSH_DOWN(_normalize_noneq_binary_predicate(
+                                    cur_expr, *(_vconjunct_ctx_ptr.get()), 
slot, value_range,
+                                    &push_down));
+                            if (_is_key_column(slot->col_name())) {
+                                RETURN_IF_PUSH_DOWN(_normalize_bloom_filter(
+                                        cur_expr, *(_vconjunct_ctx_ptr.get()), 
slot, &push_down));
+                                if (_state->enable_function_pushdown()) {
+                                    
RETURN_IF_PUSH_DOWN(_normalize_function_filters(
+                                            cur_expr, 
*(_vconjunct_ctx_ptr.get()), slot,
+                                            &push_down));
+                                }
+                            }
+                        },
+                        *range);
+            }
+            if (push_down && _is_key_column(slot->col_name())) {
+                return nullptr;
+            } else {
+                return conjunct_expr_root;
+            }
+        } else {
+            VExpr* left_child = 
_normalize_predicate(conjunct_expr_root->children()[0]);
+            VExpr* right_child = 
_normalize_predicate(conjunct_expr_root->children()[1]);
+
+            if (left_child != nullptr && right_child != nullptr) {
+                conjunct_expr_root->set_children({left_child, right_child});
+                return conjunct_expr_root;
+            } else {
+                // here only close the and expr self, do not close the child
+                conjunct_expr_root->set_children({});
+                conjunct_expr_root->close(_state, *_vconjunct_ctx_ptr,
+                                          
(*_vconjunct_ctx_ptr)->get_function_state_scope());
+            }
+
+            // here do not close Expr* now
+            return left_child != nullptr ? left_child : right_child;
+        }
+    }
+    return conjunct_expr_root;
+}
+
+Status VScanNode::_normalize_bloom_filter(VExpr* expr, VExprContext* expr_ctx, 
SlotDescriptor* slot,
+                                          bool* push_down) {
+    if (TExprNodeType::BLOOM_PRED == expr->node_type()) {
+        DCHECK(expr->children().size() == 1);
+        _bloom_filters_push_down.emplace_back(slot->col_name(), 
expr->get_bloom_filter_func());
+        *push_down = true;
+    }
+    return Status::OK();
+}
+
+Status VScanNode::_normalize_function_filters(VExpr* expr, VExprContext* 
expr_ctx,
+                                              SlotDescriptor* slot, bool* 
push_down) {
+    bool opposite = false;
+    VExpr* fn_expr = expr;
+    if (TExprNodeType::COMPOUND_PRED == expr->node_type() &&
+        expr->fn().name.function_name == "not") {
+        fn_expr = fn_expr->children()[0];
+        opposite = true;
+    }
+
+    if (TExprNodeType::FUNCTION_CALL == fn_expr->node_type()) {
+        doris_udf::FunctionContext* fn_ctx = nullptr;
+        StringVal val;
+        if 
(_should_push_down_function_filter(reinterpret_cast<VectorizedFnCall*>(fn_expr),
+                                              expr_ctx, &val, &fn_ctx)) {
+            std::string col = slot->col_name();
+            _push_down_functions.emplace_back(opposite, col, fn_ctx, val);
+            *push_down = true;
+        }
+    }
+    return Status::OK();
+}
+
+bool VScanNode::_is_predicate_acting_on_slot(
+        VExpr* expr,
+        const std::function<bool(const std::vector<VExpr*>&, const VSlotRef**, 
VExpr**)>& checker,
+        SlotDescriptor** slot_desc, ColumnValueRangeType** range) {
+    const VSlotRef* slot_ref = nullptr;
+    VExpr* child_contains_slot = nullptr;
+    if (!checker(expr->children(), &slot_ref, &child_contains_slot)) {
+        // not a slot ref(column)
+        return false;
+    }
+
+    auto entry = _slot_id_to_value_range.find(slot_ref->slot_id());
+    if (_slot_id_to_value_range.end() == entry) {
+        return false;
+    }
+    *slot_desc = entry->second.first;
+    DCHECK(child_contains_slot != nullptr);
+    if (child_contains_slot->type().type != (*slot_desc)->type().type) {
+        if (!ignore_cast(*slot_desc, child_contains_slot)) {
+            // the type of predicate not match the slot's type
+            return false;
+        }
+    }
+    *range = &(entry->second.second);
+    return true;
+}
+
+void VScanNode::_eval_const_conjuncts(VExpr* vexpr, VExprContext* expr_ctx, 
bool* push_down) {
+    char* constant_val = nullptr;
+    if (vexpr->is_constant()) {
+        if (const ColumnConst* const_column =
+                    
check_and_get_column<ColumnConst>(vexpr->get_const_col(expr_ctx)->column_ptr)) {
+            constant_val = 
const_cast<char*>(const_column->get_data_at(0).data);
+            if (constant_val == nullptr || 
*reinterpret_cast<bool*>(constant_val) == false) {
+                *push_down = true;
+                _eos = true;
+            }
+        } else {
+            LOG(WARNING) << "Expr[" << vexpr->debug_string()
+                         << "] is a constant but doesn't contain a const 
column!";
+        }
+    }
+}
+
+template <PrimitiveType T>
+Status VScanNode::_normalize_in_and_eq_predicate(VExpr* expr, VExprContext* 
expr_ctx,
+                                                 SlotDescriptor* slot, 
ColumnValueRange<T>& range,
+                                                 bool* push_down) {
+    auto temp_range = 
ColumnValueRange<T>::create_empty_column_value_range(slot->type().precision,
+                                                                           
slot->type().scale);
+    bool effect = false;
+    // 1. Normalize in conjuncts like 'where col in (v1, v2, v3)'
+    if (TExprNodeType::IN_PRED == expr->node_type()) {
+        VInPredicate* pred = static_cast<VInPredicate*>(expr);
+        if (!_should_push_down_in_predicate(pred, expr_ctx, false)) {
+            return Status::OK();
+        }
+
+        // begin to push InPredicate value into ColumnValueRange
+        InState* state = reinterpret_cast<InState*>(
+                expr_ctx->fn_context(pred->fn_context_index())
+                        ->get_function_state(FunctionContext::FRAGMENT_LOCAL));
+        HybridSetBase::IteratorBase* iter = state->hybrid_set->begin();
+        auto fn_name = std::string("");
+        while (iter->has_next()) {
+            // column in (nullptr) is always false so continue to
+            // dispose next item
+            if (nullptr == iter->get_value()) {
+                iter->next();
+                continue;
+            }
+            auto value = const_cast<void*>(iter->get_value());
+            RETURN_IF_ERROR(_change_value_range<true>(temp_range, value,
+                                                      
ColumnValueRange<T>::add_fixed_value_range,
+                                                      fn_name, 
!state->hybrid_set->is_date_v2()));
+            iter->next();
+        }
+
+        range.intersection(temp_range);
+        effect = true;
+    } else if (TExprNodeType::BINARY_PRED == expr->node_type()) {
+        DCHECK(expr->children().size() == 2);
+        auto eq_checker = [](const std::string& fn_name) { return fn_name == 
"eq"; };
+
+        StringRef value;
+        int slot_ref_child = -1;
+        if 
(_should_push_down_binary_predicate(reinterpret_cast<VectorizedFnCall*>(expr), 
expr_ctx,
+                                               &value, &slot_ref_child, 
eq_checker)) {
+            DCHECK(slot_ref_child >= 0);
+            // where A = nullptr should return empty result set
+            auto fn_name = std::string("");
+            if (value.data != nullptr) {
+                if constexpr (T == TYPE_CHAR || T == TYPE_VARCHAR || T == 
TYPE_STRING ||
+                              T == TYPE_HLL) {
+                    auto val = StringValue(value.data, value.size);
+                    RETURN_IF_ERROR(_change_value_range<true>(
+                            temp_range, reinterpret_cast<void*>(&val),
+                            ColumnValueRange<T>::add_fixed_value_range, 
fn_name));
+                } else {
+                    RETURN_IF_ERROR(_change_value_range<true>(
+                            temp_range, 
reinterpret_cast<void*>(const_cast<char*>(value.data)),
+                            ColumnValueRange<T>::add_fixed_value_range, 
fn_name));
+                }
+                range.intersection(temp_range);
+                effect = true;
+            }
+        }
+    }
+
+    // exceed limit, no conditions will be pushed down to storage engine.
+    if (range.get_fixed_value_size() > _max_pushdown_conditions_per_column) {
+        range.set_whole_value_range();
+    } else {
+        *push_down = effect;
+    }
+    return Status::OK();
+}
+
+template <PrimitiveType T>
+Status VScanNode::_normalize_not_in_and_not_eq_predicate(VExpr* expr, 
VExprContext* expr_ctx,
+                                                         SlotDescriptor* slot,
+                                                         ColumnValueRange<T>& 
range,
+                                                         bool* push_down) {
+    bool is_fixed_range = range.is_fixed_value_range();
+    auto not_in_range = 
ColumnValueRange<T>::create_empty_column_value_range(range.column_name());
+    bool effect = false;
+    // 1. Normalize in conjuncts like 'where col in (v1, v2, v3)'
+    if (TExprNodeType::IN_PRED == expr->node_type()) {
+        VInPredicate* pred = static_cast<VInPredicate*>(expr);
+        if (!_should_push_down_in_predicate(pred, expr_ctx, true)) {
+            return Status::OK();
+        }
+
+        // begin to push InPredicate value into ColumnValueRange
+        InState* state = reinterpret_cast<InState*>(
+                expr_ctx->fn_context(pred->fn_context_index())
+                        ->get_function_state(FunctionContext::FRAGMENT_LOCAL));
+        HybridSetBase::IteratorBase* iter = state->hybrid_set->begin();
+        auto fn_name = std::string("");
+        while (iter->has_next()) {
+            // column not in (nullptr) is always true
+            if (nullptr == iter->get_value()) {
+                continue;
+            }
+            auto value = const_cast<void*>(iter->get_value());
+            if (is_fixed_range) {
+                RETURN_IF_ERROR(_change_value_range<true>(
+                        range, value, 
ColumnValueRange<T>::remove_fixed_value_range, fn_name,
+                        !state->hybrid_set->is_date_v2()));
+            } else {
+                RETURN_IF_ERROR(_change_value_range<true>(
+                        not_in_range, value, 
ColumnValueRange<T>::add_fixed_value_range, fn_name,
+                        !state->hybrid_set->is_date_v2()));
+            }
+            iter->next();
+        }
+        effect = true;
+    } else if (TExprNodeType::BINARY_PRED == expr->node_type()) {
+        DCHECK(expr->children().size() == 2);
+
+        auto ne_checker = [](const std::string& fn_name) { return fn_name == 
"ne"; };
+        StringRef value;
+        int slot_ref_child = -1;
+        if 
(_should_push_down_binary_predicate(reinterpret_cast<VectorizedFnCall*>(expr), 
expr_ctx,
+                                               &value, &slot_ref_child, 
ne_checker)) {
+            DCHECK(slot_ref_child >= 0);
+            // where A = nullptr should return empty result set
+            if (value.data != nullptr) {
+                auto fn_name = std::string("");
+                if constexpr (T == TYPE_CHAR || T == TYPE_VARCHAR || T == 
TYPE_STRING ||
+                              T == TYPE_HLL) {
+                    auto val = StringValue(value.data, value.size);
+                    if (is_fixed_range) {
+                        RETURN_IF_ERROR(_change_value_range<true>(
+                                range, reinterpret_cast<void*>(&val),
+                                ColumnValueRange<T>::remove_fixed_value_range, 
fn_name));
+                    } else {
+                        RETURN_IF_ERROR(_change_value_range<true>(
+                                not_in_range, reinterpret_cast<void*>(&val),
+                                ColumnValueRange<T>::add_fixed_value_range, 
fn_name));
+                    }
+                } else {
+                    if (is_fixed_range) {
+                        RETURN_IF_ERROR(_change_value_range<true>(
+                                range, 
reinterpret_cast<void*>(const_cast<char*>(value.data)),
+                                ColumnValueRange<T>::remove_fixed_value_range, 
fn_name));
+                    } else {
+                        RETURN_IF_ERROR(_change_value_range<true>(
+                                not_in_range,
+                                
reinterpret_cast<void*>(const_cast<char*>(value.data)),
+                                ColumnValueRange<T>::add_fixed_value_range, 
fn_name));
+                    }
+                }
+                effect = true;
+            }
+        }
+    }
+
+    if (is_fixed_range ||
+        not_in_range.get_fixed_value_size() <= 
_max_pushdown_conditions_per_column) {
+        if (!is_fixed_range) {
+            // push down not in condition to storage engine
+            not_in_range.to_in_condition(_olap_filters, false);
+        }
+        *push_down = effect;
+    }
+    return Status::OK();
+}
+
+template <PrimitiveType T>
+Status VScanNode::_normalize_is_null_predicate(VExpr* expr, VExprContext* 
expr_ctx,
+                                               SlotDescriptor* slot, 
ColumnValueRange<T>& range,
+                                               bool* push_down) {
+    if (TExprNodeType::FUNCTION_CALL == expr->node_type()) {
+        if (reinterpret_cast<VectorizedFnCall*>(expr)->fn().name.function_name 
== "is_null_pred") {
+            auto temp_range = 
ColumnValueRange<T>::create_empty_column_value_range(
+                    slot->type().precision, slot->type().scale);
+            temp_range.set_contain_null(true);
+            range.intersection(temp_range);
+            *push_down = true;
+        } else if 
(reinterpret_cast<VectorizedFnCall*>(expr)->fn().name.function_name ==
+                   "is_not_null_pred") {
+            auto temp_range = 
ColumnValueRange<T>::create_empty_column_value_range(
+                    slot->type().precision, slot->type().scale);
+            temp_range.set_contain_null(false);
+            range.intersection(temp_range);
+            *push_down = true;
+        }
+    }
+    return Status::OK();
+}
+
+template <PrimitiveType T>
+Status VScanNode::_normalize_noneq_binary_predicate(VExpr* expr, VExprContext* 
expr_ctx,
+                                                    SlotDescriptor* slot,
+                                                    ColumnValueRange<T>& 
range, bool* push_down) {
+    if (TExprNodeType::BINARY_PRED == expr->node_type()) {
+        DCHECK(expr->children().size() == 2);
+
+        auto noneq_checker = [](const std::string& fn_name) {
+            return fn_name != "ne" && fn_name != "eq";
+        };
+        StringRef value;
+        int slot_ref_child = -1;
+        if 
(_should_push_down_binary_predicate(reinterpret_cast<VectorizedFnCall*>(expr), 
expr_ctx,
+                                               &value, &slot_ref_child, 
noneq_checker)) {
+            DCHECK(slot_ref_child >= 0);
+            const std::string& fn_name =
+                    
reinterpret_cast<VectorizedFnCall*>(expr)->fn().name.function_name;
+
+            // where A = nullptr should return empty result set
+            if (value.data != nullptr) {
+                *push_down = true;
+                if constexpr (T == TYPE_CHAR || T == TYPE_VARCHAR || T == 
TYPE_STRING ||
+                              T == TYPE_HLL) {
+                    auto val = StringValue(value.data, value.size);
+                    RETURN_IF_ERROR(_change_value_range<false>(range, 
reinterpret_cast<void*>(&val),
+                                                               
ColumnValueRange<T>::add_value_range,
+                                                               fn_name, true, 
slot_ref_child));
+                } else {
+                    RETURN_IF_ERROR(_change_value_range<false>(
+                            range, 
reinterpret_cast<void*>(const_cast<char*>(value.data)),
+                            ColumnValueRange<T>::add_value_range, fn_name, 
true, slot_ref_child));
+                }
+            }
+        }
+    }
+    return Status::OK();
+}
+
+template <bool IsFixed, PrimitiveType PrimitiveType, typename 
ChangeFixedValueRangeFunc>
+Status VScanNode::_change_value_range(ColumnValueRange<PrimitiveType>& 
temp_range, void* value,
+                                      const ChangeFixedValueRangeFunc& func,
+                                      const std::string& fn_name, bool 
cast_date_to_datetime,
+                                      int slot_ref_child) {
+    if constexpr (PrimitiveType == TYPE_DATE) {
+        DateTimeValue date_value;
+        
reinterpret_cast<VecDateTimeValue*>(value)->convert_vec_dt_to_dt(&date_value);
+        if constexpr (IsFixed) {
+            if (!date_value.check_loss_accuracy_cast_to_date()) {
+                func(temp_range,
+                     reinterpret_cast<typename 
PrimitiveTypeTraits<PrimitiveType>::CppType*>(
+                             &date_value));
+            }
+        } else {
+            if (date_value.check_loss_accuracy_cast_to_date()) {
+                if (fn_name == "lt" || fn_name == "ge") {
+                    ++date_value;
+                }
+            }
+            func(temp_range, to_olap_filter_type(fn_name, slot_ref_child),
+                 reinterpret_cast<typename 
PrimitiveTypeTraits<PrimitiveType>::CppType*>(
+                         &date_value));
+        }
+    } else if constexpr (PrimitiveType == TYPE_DATETIME) {
+        DateTimeValue date_value;
+        
reinterpret_cast<VecDateTimeValue*>(value)->convert_vec_dt_to_dt(&date_value);
+        if constexpr (IsFixed) {
+            func(temp_range,
+                 reinterpret_cast<typename 
PrimitiveTypeTraits<PrimitiveType>::CppType*>(
+                         &date_value));
+        } else {
+            func(temp_range, to_olap_filter_type(fn_name, slot_ref_child),
+                 reinterpret_cast<typename 
PrimitiveTypeTraits<PrimitiveType>::CppType*>(
+                         reinterpret_cast<char*>(&date_value)));
+        }
+    } else if constexpr (PrimitiveType == TYPE_DATEV2) {
+        if (cast_date_to_datetime) {
+            DateV2Value<DateTimeV2ValueType> datetimev2_value =
+                    
*reinterpret_cast<DateV2Value<DateTimeV2ValueType>*>(value);
+            if constexpr (IsFixed) {
+                if (datetimev2_value.can_cast_to_date_without_loss_accuracy()) 
{
+                    DateV2Value<DateV2ValueType> date_v2;
+                    
date_v2.set_date_uint32(binary_cast<DateV2Value<DateTimeV2ValueType>, uint64_t>(
+                                                    datetimev2_value) >>
+                                            TIME_PART_LENGTH);
+                    func(temp_range, &date_v2);
+                }
+            } else {
+                doris::vectorized::DateV2Value<DateV2ValueType> date_v2;
+                date_v2.set_date_uint32(
+                        binary_cast<DateV2Value<DateTimeV2ValueType>, 
uint64_t>(datetimev2_value) >>
+                        TIME_PART_LENGTH);
+                if 
(!datetimev2_value.can_cast_to_date_without_loss_accuracy()) {
+                    if (fn_name == "lt" || fn_name == "ge") {
+                        ++date_v2;
+                    }
+                }
+                func(temp_range, to_olap_filter_type(fn_name, slot_ref_child), 
&date_v2);
+            }
+        } else {
+            if constexpr (IsFixed) {
+                func(temp_range,
+                     reinterpret_cast<typename 
PrimitiveTypeTraits<PrimitiveType>::CppType*>(
+                             value));
+            } else {
+                func(temp_range, to_olap_filter_type(fn_name, slot_ref_child),
+                     reinterpret_cast<typename 
PrimitiveTypeTraits<PrimitiveType>::CppType*>(
+                             value));
+            }
+        }
+    } else if constexpr ((PrimitiveType == TYPE_DECIMALV2) || (PrimitiveType 
== TYPE_CHAR) ||
+                         (PrimitiveType == TYPE_VARCHAR) || (PrimitiveType == 
TYPE_HLL) ||
+                         (PrimitiveType == TYPE_DATETIMEV2) || (PrimitiveType 
== TYPE_TINYINT) ||
+                         (PrimitiveType == TYPE_SMALLINT) || (PrimitiveType == 
TYPE_INT) ||
+                         (PrimitiveType == TYPE_BIGINT) || (PrimitiveType == 
TYPE_LARGEINT) ||
+                         (PrimitiveType == TYPE_DECIMAL32) || (PrimitiveType 
== TYPE_DECIMAL64) ||
+                         (PrimitiveType == TYPE_DECIMAL128) || (PrimitiveType 
== TYPE_STRING) ||
+                         (PrimitiveType == TYPE_BOOLEAN)) {
+        if constexpr (IsFixed) {
+            func(temp_range,
+                 reinterpret_cast<typename 
PrimitiveTypeTraits<PrimitiveType>::CppType*>(value));
+        } else {
+            func(temp_range, to_olap_filter_type(fn_name, slot_ref_child),
+                 reinterpret_cast<typename 
PrimitiveTypeTraits<PrimitiveType>::CppType*>(value));
+        }
+    } else {
+        static_assert(always_false_v<PrimitiveType>);
+    }
+
+    return Status::OK();
+}
+
+Status VScanNode::try_append_late_arrival_runtime_filter(int* arrived_rf_num) {
+    if (_is_all_rf_applied) {
+        *arrived_rf_num = _runtime_filter_descs.size();
+        return Status::OK();
+    }
+
+    // This method will be called in scanner thread.
+    // So need to add lock
+    std::unique_lock<std::mutex> l(_rf_locks);
+    if (_is_all_rf_applied) {
+        *arrived_rf_num = _runtime_filter_descs.size();
+        return Status::OK();
+    }
+
+    // 1. Check if are runtime filter ready but not applied.
+    std::vector<VExpr*> vexprs;
+    int current_arrived_rf_num = 0;
+    for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
+        if (_runtime_filter_ctxs[i].apply_mark) {
+            ++current_arrived_rf_num;
+            continue;
+        } else if (_runtime_filter_ctxs[i].runtime_filter->is_ready()) {
+            
_runtime_filter_ctxs[i].runtime_filter->get_prepared_vexprs(&vexprs, 
row_desc());
+            ++current_arrived_rf_num;
+            _runtime_filter_ctxs[i].apply_mark = true;
+        }
+    }
+    // 2. Append unapplied runtime filters to vconjunct_ctx_ptr
+    if (!vexprs.empty()) {
+        RETURN_IF_ERROR(_append_rf_into_conjuncts(vexprs));
+    }
+    if (current_arrived_rf_num == _runtime_filter_descs.size()) {
+        _is_all_rf_applied = true;
+    }
+
+    *arrived_rf_num = current_arrived_rf_num;
+    return Status::OK();
+}
+
+Status VScanNode::clone_vconjunct_ctx(VExprContext** _vconjunct_ctx) {
+    if (_vconjunct_ctx_ptr) {
+        std::unique_lock<std::mutex> l(_rf_locks);
+        return (*_vconjunct_ctx_ptr)->clone(_state, _vconjunct_ctx);
+    }
+    return Status::OK();
+}
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/vscan_node.h 
b/be/src/vec/exec/scan/vscan_node.h
new file mode 100644
index 0000000000..292234162e
--- /dev/null
+++ b/be/src/vec/exec/scan/vscan_node.h
@@ -0,0 +1,246 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "exec/exec_node.h"
+#include "exec/olap_common.h"
+#include "exprs/function_filter.h"
+#include "exprs/runtime_filter.h"
+#include "vec/exec/scan/scanner_context.h"
+#include "vec/exprs/vectorized_fn_call.h"
+#include "vec/exprs/vexpr.h"
+#include "vec/exprs/vin_predicate.h"
+
+namespace doris::vectorized {
+
+class VScanner;
+class VSlotRef;
+
+class VScanNode : public ExecNode {
+public:
+    VScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& 
descs)
+            : ExecNode(pool, tnode, descs) {}
+    friend class NewOlapScanner;
+
+    Status init(const TPlanNode& tnode, RuntimeState* state = nullptr) 
override;
+
+    Status prepare(RuntimeState* state) override;
+
+    Status open(RuntimeState* state) override;
+
+    virtual void set_scan_ranges(const std::vector<TScanRangeParams>& 
scan_ranges) {}
+
+    Status get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) 
override {
+        return Status::NotSupported("Not implement");
+    }
+
+    // Get next block.
+    // If eos is true, no more data will be read and block should be empty.
+    Status get_next(RuntimeState* state, vectorized::Block* block, bool* eos) 
override;
+
+    Status close(RuntimeState* state) override;
+
+    void set_no_agg_finalize() { _need_agg_finalize = false; }
+
+    // Try append late arrived runtime filters.
+    // Return num of filters which are applied already.
+    Status try_append_late_arrival_runtime_filter(int* arrived_rf_num);
+
+    // Clone current vconjunct_ctx to _vconjunct_ctx, if exists.
+    Status clone_vconjunct_ctx(VExprContext** _vconjunct_ctx);
+
+    int runtime_filter_num() const { return (int)_runtime_filter_ctxs.size(); }
+
+    TupleId input_tuple_id() const { return _input_tuple_id; }
+    TupleId output_tuple_id() const { return _output_tuple_id; }
+    const TupleDescriptor* input_tuple_desc() const { return 
_input_tuple_desc; }
+    const TupleDescriptor* output_tuple_desc() const { return 
_output_tuple_desc; }
+
+protected:
+    // Different data sources register different profiles by implementing this 
method
+    virtual Status _init_profile() { return Status::OK(); }
+
+    // Process predicates, extract the predicates in the conjuncts that can be 
pushed down
+    // to the data source, and convert them into common expressions structure 
ColumnPredicate.
+    // There are currently 3 types of predicates that can be pushed down to 
data sources:
+    //
+    // 1. Simple predicate, with column on left and constant on right, such as 
"a=1", "b in (1,2,3)" etc.
+    // 2. Bloom Filter, predicate condition generated by runtime filter
+    // 3. Function Filter, some data sources can accept function conditions, 
such as "a like 'abc%'"
+    //
+    // Predicates that can be fully processed by the data source will be 
removed from conjuncts
+    virtual Status _process_conjuncts() {
+        RETURN_IF_ERROR(_normalize_conjuncts());
+        return Status::OK();
+    }
+
+    // Create a list of scanners.
+    // The number of scanners is related to the implementation of the data 
source,
+    // predicate conditions, and scheduling strategy.
+    // So this method needs to be implemented separately by the subclass of 
ScanNode.
+    // Finally, a set of scanners that have been prepared are returned.
+    virtual Status _init_scanners(std::list<VScanner*>* scanners) { return 
Status::OK(); }
+
+    //  Different data sources can implement the following 3 methods to 
determine whether a predicate
+    //  can be pushed down to the data source.
+    //  3 types:
+    //      1. binary predicate
+    //      2. in/not in predicate
+    //      3. function predicate
+    //  TODO: these interfaces should be change to become more common.
+    virtual bool _should_push_down_binary_predicate(
+            VectorizedFnCall* fn_call, VExprContext* expr_ctx, StringRef* 
constant_val,
+            int* slot_ref_child, const std::function<bool(const 
std::string&)>& fn_checker) {
+        return false;
+    }
+
+    virtual bool _should_push_down_in_predicate(VInPredicate* in_pred, 
VExprContext* expr_ctx,
+                                                bool is_not_in) {
+        return false;
+    }
+
+    virtual bool _should_push_down_function_filter(VectorizedFnCall* fn_call,
+                                                   VExprContext* expr_ctx, 
StringVal* constant_str,
+                                                   
doris_udf::FunctionContext** fn_ctx) {
+        return false;
+    }
+
+    // Return true if it is a key column.
+    // Only predicate on key column can be pushed down.
+    virtual bool _is_key_column(const std::string& col_name) { return false; }
+
+protected:
+    RuntimeState* _state;
+    // For load scan node, there should be both input and output tuple 
descriptor.
+    // For query scan node, there is only output_tuple_desc.
+    TupleId _input_tuple_id = -1;
+    TupleId _output_tuple_id = -1;
+    const TupleDescriptor* _input_tuple_desc;
+    const TupleDescriptor* _output_tuple_desc;
+
+    // These two values are from query_options
+    int _max_scan_key_num;
+    int _max_pushdown_conditions_per_column;
+
+    // For runtime filters
+    struct RuntimeFilterContext {
+        RuntimeFilterContext() : apply_mark(false), runtime_filter(nullptr) {}
+        // set to true if this runtime filter is already applied to 
vconjunct_ctx_ptr
+        bool apply_mark;
+        IRuntimeFilter* runtime_filter;
+    };
+    std::vector<RuntimeFilterContext> _runtime_filter_ctxs;
+
+    std::vector<TRuntimeFilterDesc> _runtime_filter_descs;
+    // Set to true if the runtime filter is ready.
+    std::vector<bool> _runtime_filter_ready_flag;
+    std::mutex _rf_locks;
+    std::map<int, RuntimeFilterContext*> _conjunct_id_to_runtime_filter_ctxs;
+    phmap::flat_hash_set<VExpr*> _rf_vexpr_set;
+    // True means all runtime filters are applied to scanners
+    bool _is_all_rf_applied = true;
+
+    // Each scan node will generates a ScannerContext to manage all Scanners.
+    // See comments of ScannerContext for more details
+    std::shared_ptr<ScannerContext> _scanner_ctx;
+    // Save all scanner objects.
+    ObjectPool _scanner_pool;
+
+    // indicate this scan node has no more data to return
+    bool _eos = false;
+
+    // Save all bloom filter predicates which may be pushed down to data 
source.
+    // column name -> bloom filter function
+    std::vector<std::pair<std::string, std::shared_ptr<IBloomFilterFuncBase>>>
+            _bloom_filters_push_down;
+
+    // Save all function predicates which may be pushed down to data source.
+    std::vector<FunctionFilter> _push_down_functions;
+
+    // slot id -> ColumnValueRange
+    // Parsed from conjunts
+    phmap::flat_hash_map<int, std::pair<SlotDescriptor*, ColumnValueRangeType>>
+            _slot_id_to_value_range;
+    // column -> ColumnValueRange
+    std::map<std::string, ColumnValueRangeType> _colname_to_value_range;
+
+    bool _need_agg_finalize = true;
+
+    // TODO: should be moved to olap scan node?
+    std::vector<TCondition> _olap_filters;
+
+    // Every time vconjunct_ctx_ptr is updated, the old ctx will be stored in 
this vector
+    // so that it will be destroyed uniformly at the end of the query.
+    std::vector<std::unique_ptr<VExprContext*>> _stale_vexpr_ctxs;
+
+    // If sort info is set, push limit to each scanner;
+    int64_t _limit_per_scanner = -1;
+
+private:
+    // Register and get all runtime filters at Init phase.
+    Status _register_runtime_filter();
+    // Get all arrived runtime filters at Open phase.
+    Status _acquire_runtime_filter();
+    // Append late-arrival runtime filters to the vconjunct_ctx.
+    Status _append_rf_into_conjuncts(std::vector<VExpr*>& vexprs);
+
+    Status _normalize_conjuncts();
+    VExpr* _normalize_predicate(VExpr* conjunct_expr_root);
+    void _eval_const_conjuncts(VExpr* vexpr, VExprContext* expr_ctx, bool* 
push_down);
+
+    Status _normalize_bloom_filter(VExpr* expr, VExprContext* expr_ctx, 
SlotDescriptor* slot,
+                                   bool* push_down);
+
+    Status _normalize_function_filters(VExpr* expr, VExprContext* expr_ctx, 
SlotDescriptor* slot,
+                                       bool* push_down);
+
+    bool _is_predicate_acting_on_slot(VExpr* expr,
+                                      const std::function<bool(const 
std::vector<VExpr*>&,
+                                                               const 
VSlotRef**, VExpr**)>& checker,
+                                      SlotDescriptor** slot_desc, 
ColumnValueRangeType** range);
+
+    template <PrimitiveType T>
+    Status _normalize_in_and_eq_predicate(vectorized::VExpr* expr, 
VExprContext* expr_ctx,
+                                          SlotDescriptor* slot, 
ColumnValueRange<T>& range,
+                                          bool* push_down);
+    template <PrimitiveType T>
+    Status _normalize_not_in_and_not_eq_predicate(vectorized::VExpr* expr, 
VExprContext* expr_ctx,
+                                                  SlotDescriptor* slot, 
ColumnValueRange<T>& range,
+                                                  bool* push_down);
+
+    template <PrimitiveType T>
+    Status _normalize_noneq_binary_predicate(vectorized::VExpr* expr, 
VExprContext* expr_ctx,
+                                             SlotDescriptor* slot, 
ColumnValueRange<T>& range,
+                                             bool* push_down);
+
+    template <PrimitiveType T>
+    Status _normalize_is_null_predicate(vectorized::VExpr* expr, VExprContext* 
expr_ctx,
+                                        SlotDescriptor* slot, 
ColumnValueRange<T>& range,
+                                        bool* push_down);
+
+    template <bool IsFixed, PrimitiveType PrimitiveType, typename 
ChangeFixedValueRangeFunc>
+    static Status _change_value_range(ColumnValueRange<PrimitiveType>& range, 
void* value,
+                                      const ChangeFixedValueRangeFunc& func,
+                                      const std::string& fn_name, bool 
cast_date_to_datetime = true,
+                                      int slot_ref_child = -1);
+
+    // Submit the scanner to the thread pool and start execution
+    Status _start_scanners(const std::list<VScanner*>& scanners);
+};
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/vscanner.cpp 
b/be/src/vec/exec/scan/vscanner.cpp
new file mode 100644
index 0000000000..e154e0e6ba
--- /dev/null
+++ b/be/src/vec/exec/scan/vscanner.cpp
@@ -0,0 +1,155 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exec/scan/vscanner.h"
+
+#include "vec/exec/scan/vscan_node.h"
+
+namespace doris::vectorized {
+
+VScanner::VScanner(RuntimeState* state, VScanNode* parent, int64_t limit, 
MemTracker* mem_tracker)
+        : _state(state),
+          _parent(parent),
+          _limit(limit),
+          _mem_tracker(mem_tracker),
+          _input_tuple_desc(parent->input_tuple_desc()),
+          _output_tuple_desc(parent->output_tuple_desc()) {
+    _real_tuple_desc = _input_tuple_desc != nullptr ? _input_tuple_desc : 
_output_tuple_desc;
+    _total_rf_num = _parent->runtime_filter_num();
+}
+
+Status VScanner::get_block(RuntimeState* state, Block* block, bool* eof) {
+    // only empty block should be here
+    DCHECK(block->rows() == 0);
+    SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
+
+    int64_t raw_rows_threshold = raw_rows_read() + 
config::doris_scanner_row_num;
+    if (!block->mem_reuse()) {
+        for (const auto slot_desc : _output_tuple_desc->slots()) {
+            
block->insert(ColumnWithTypeAndName(slot_desc->get_empty_mutable_column(),
+                                                slot_desc->get_data_type_ptr(),
+                                                slot_desc->col_name()));
+        }
+    }
+
+    _init_input_block(block);
+    {
+        // SCOPED_TIMER(_parent->_scan_timer);
+        do {
+            // 1. Get input block from scanner
+            RETURN_IF_ERROR(_get_block_impl(state, _input_block_ptr, eof));
+            if (*eof) {
+                DCHECK(_input_block_ptr->rows() == 0);
+                break;
+            }
+            _num_rows_read += _input_block_ptr->rows();
+            // _update_realtime_counter();
+
+            // 2. For load, use prefilter to filter the input block first.
+            RETURN_IF_ERROR(_filter_input_block(_input_block_ptr));
+
+            // 3. For load, convert input block to output block
+            RETURN_IF_ERROR(_convert_to_output_block(block));
+
+            // 4. Filter the output block finally.
+            //    NOTE that step 2/3 may be skipped, for Query.
+            RETURN_IF_ERROR(_filter_output_block(block));
+            // record rows return (after filter) for _limit check
+            _num_rows_return += block->rows();
+        } while (block->rows() == 0 && !(*eof) && raw_rows_read() < 
raw_rows_threshold);
+    }
+
+    return Status::OK();
+}
+
+void VScanner::_init_input_block(Block* output_block) {
+    if (_input_tuple_desc == nullptr) {
+        _input_block_ptr = output_block;
+        return;
+    }
+
+    // init the input block used for scanner.
+    _input_block.clear();
+    _input_block_ptr = &_input_block;
+    DCHECK(_input_block.columns() == 0);
+
+    for (auto& slot_desc : _input_tuple_desc->slots()) {
+        auto data_type = slot_desc->get_data_type_ptr();
+        _input_block.insert(vectorized::ColumnWithTypeAndName(
+                data_type->create_column(), slot_desc->get_data_type_ptr(), 
slot_desc->col_name()));
+    }
+}
+
+Status VScanner::_filter_input_block(Block* block) {
+    // TODO: implement
+    return Status::OK();
+}
+
+Status VScanner::_convert_to_output_block(Block* output_block) {
+    if (_input_block_ptr == output_block) {
+        return Status::OK();
+    }
+    // TODO: implement
+
+    return Status::OK();
+}
+
+Status VScanner::_filter_output_block(Block* block) {
+    return VExprContext::filter_block(_vconjunct_ctx, block, 
_output_tuple_desc->slots().size());
+}
+
+Status VScanner::try_append_late_arrival_runtime_filter() {
+    if (_applied_rf_num == _total_rf_num) {
+        return Status::OK();
+    }
+    DCHECK(_applied_rf_num < _total_rf_num);
+
+    int arrived_rf_num = 0;
+    
RETURN_IF_ERROR(_parent->try_append_late_arrival_runtime_filter(&arrived_rf_num));
+
+    if (arrived_rf_num == _applied_rf_num) {
+        // No newly arrived runtime filters, just return;
+        return Status::OK();
+    }
+
+    // There are newly arrived runtime filters,
+    // renew the vconjunct_ctx_ptr
+    if (_vconjunct_ctx) {
+        _discard_conjuncts();
+    }
+    // Notice that the number of runtiem filters may be larger than 
_applied_rf_num.
+    // But it is ok because it will be updated at next time.
+    RETURN_IF_ERROR(_parent->clone_vconjunct_ctx(&_vconjunct_ctx));
+    _applied_rf_num = arrived_rf_num;
+    return Status::OK();
+}
+
+Status VScanner::close(RuntimeState* state) {
+    if (_is_closed) {
+        return Status::OK();
+    }
+    for (auto& ctx : _stale_vexpr_ctxs) {
+        ctx->close(state);
+    }
+    if (_vconjunct_ctx) {
+        _vconjunct_ctx->close(state);
+    }
+    _is_closed = true;
+    return Status::OK();
+}
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h
new file mode 100644
index 0000000000..aff3c60393
--- /dev/null
+++ b/be/src/vec/exec/scan/vscanner.h
@@ -0,0 +1,140 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "common/status.h"
+#include "olap/tablet.h"
+#include "runtime/runtime_state.h"
+#include "vec/exprs/vexpr_context.h"
+
+namespace doris::vectorized {
+
+class Block;
+class VScanNode;
+
+class VScanner {
+public:
+    VScanner(RuntimeState* state, VScanNode* parent, int64_t limit, 
MemTracker* mem_tracker);
+
+    virtual ~VScanner() {}
+
+    virtual Status open(RuntimeState* state) { return Status::OK(); }
+
+    Status get_block(RuntimeState* state, Block* block, bool* eos);
+
+    virtual Status close(RuntimeState* state);
+
+    // Subclass must implement this to return the current rows read
+    virtual int64_t raw_rows_read() { return 0; }
+
+protected:
+    // Subclass should implement this to return data.
+    virtual Status _get_block_impl(RuntimeState* state, Block* block, bool* 
eof) = 0;
+
+    // Init the input block if _input_tuple_desc is set.
+    // Otherwise, use output_block directly.
+    void _init_input_block(Block* output_block);
+
+    // Use prefilters to filter input block
+    Status _filter_input_block(Block* block);
+
+    // Convert input block to output block, if needed.
+    Status _convert_to_output_block(Block* output_block);
+
+    // Filter the output block finally.
+    Status _filter_output_block(Block* block);
+
+public:
+    VScanNode* get_parent() { return _parent; }
+
+    Status try_append_late_arrival_runtime_filter();
+
+    // Call start_wait_worker_timer() when submit the scanner to the thread 
pool.
+    // And call update_wait_worker_timer() when it is actually being executed.
+    void start_wait_worker_timer() {}
+    int64_t update_wait_worker_timer() { return 0; }
+
+    RuntimeState* runtime_state() { return _state; }
+
+    bool is_open() { return _is_open; }
+    void set_opened() { _is_open = true; }
+
+    int queue_id() { return _state->exec_env()->store_path_to_index("xxx"); }
+
+    doris::TabletStorageType get_storage_type() {
+        return doris::TabletStorageType::STORAGE_TYPE_LOCAL;
+    }
+
+    bool need_to_close() { return _need_to_close; }
+
+    void mark_to_need_to_close() { _need_to_close = true; }
+
+    void set_status_on_failure(const Status& st) { _status = st; }
+
+    VExprContext** vconjunct_ctx_ptr() { return &_vconjunct_ctx; }
+
+protected:
+    void _discard_conjuncts() {
+        if (_vconjunct_ctx) {
+            _vconjunct_ctx->mark_as_stale();
+            _stale_vexpr_ctxs.push_back(_vconjunct_ctx);
+            _vconjunct_ctx = nullptr;
+        }
+    }
+
+protected:
+    RuntimeState* _state;
+    VScanNode* _parent;
+    // Set if scan node has sort limit info
+    int64_t _limit = -1;
+    MemTracker* _mem_tracker;
+
+    const TupleDescriptor* _input_tuple_desc;
+    const TupleDescriptor* _output_tuple_desc;
+    const TupleDescriptor* _real_tuple_desc;
+
+    // If _input_tuple_desc is set, the scanner will read data into
+    // this _input_block first, then convert to the output block.
+    Block _input_block;
+    // If _input_tuple_desc is set, this will point to _input_block,
+    // otherwise, it will point to the output block.
+    Block* _input_block_ptr;
+
+    bool _is_open = false;
+    bool _is_closed = false;
+    bool _need_to_close = false;
+    Status _status;
+
+    // If _applied_rf_num == _total_rf_num
+    // means all runtime filters are arrived and applied.
+    int _applied_rf_num = 0;
+    int _total_rf_num = 0;
+    // Cloned from _vconjunct_ctx of scan node.
+    // It includes predicate in SQL and runtime filters.
+    VExprContext* _vconjunct_ctx = nullptr;
+    // Late arriving runtime filters will update _vconjunct_ctx.
+    // The old _vconjunct_ctx will be temporarily placed in _stale_vexpr_ctxs
+    // and will be destroyed at the end.
+    std::vector<VExprContext*> _stale_vexpr_ctxs;
+
+    int64_t _num_rows_read = 0;
+    int64_t _raw_rows_read = 0;
+    int64_t _num_rows_return = 0;
+};
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/volap_scan_node.cpp 
b/be/src/vec/exec/volap_scan_node.cpp
index 5542eefeb0..f0c667f8f4 100644
--- a/be/src/vec/exec/volap_scan_node.cpp
+++ b/be/src/vec/exec/volap_scan_node.cpp
@@ -1171,12 +1171,7 @@ int 
VOlapScanNode::_start_scanner_thread_task(RuntimeState* state, int block_per
     }
 
     // post volap scanners to thread-pool
-    ThreadPoolToken* thread_token = nullptr;
-    if (_limit > -1 && _limit < 1024) {
-        thread_token = state->get_query_fragments_ctx()->get_serial_token();
-    } else {
-        thread_token = state->get_query_fragments_ctx()->get_token();
-    }
+    ThreadPoolToken* thread_token = 
state->get_query_fragments_ctx()->get_token();
     auto iter = olap_scanners.begin();
     if (thread_token != nullptr) {
         while (iter != olap_scanners.end()) {
diff --git a/be/src/vec/olap/block_reader.cpp b/be/src/vec/olap/block_reader.cpp
index cb1c1aa263..19e8c28cad 100644
--- a/be/src/vec/olap/block_reader.cpp
+++ b/be/src/vec/olap/block_reader.cpp
@@ -175,6 +175,7 @@ Status BlockReader::_direct_next_block(Block* block, 
MemPool* mem_pool, ObjectPo
         return res;
     }
     *eof = res.precise_code() == OLAP_ERR_DATA_EOF;
+    _eof = *eof;
     if (UNLIKELY(_reader_context.record_rowids)) {
         res = 
_vcollect_iter.current_block_row_locations(&_block_row_locations);
         if (UNLIKELY(!res.ok() && res != 
Status::OLAPInternalError(OLAP_ERR_DATA_EOF))) {
@@ -208,6 +209,7 @@ Status BlockReader::_agg_key_next_block(Block* block, 
MemPool* mem_pool, ObjectP
     while (true) {
         auto res = _vcollect_iter.next(&_next_row);
         if (UNLIKELY(res.precise_code() == OLAP_ERR_DATA_EOF)) {
+            _eof = true;
             *eof = true;
             break;
         }
@@ -265,6 +267,7 @@ Status BlockReader::_unique_key_next_block(Block* block, 
MemPool* mem_pool, Obje
         // merge the lower versions
         auto res = _vcollect_iter.next(&_next_row);
         if (UNLIKELY(res.precise_code() == OLAP_ERR_DATA_EOF)) {
+            _eof = true;
             *eof = true;
             if (UNLIKELY(_reader_context.record_rowids)) {
                 _block_row_locations.resize(target_block_row);
diff --git a/be/src/vec/sink/vdata_stream_sender.h 
b/be/src/vec/sink/vdata_stream_sender.h
index 1c8bcf4f87..be9861caf5 100644
--- a/be/src/vec/sink/vdata_stream_sender.h
+++ b/be/src/vec/sink/vdata_stream_sender.h
@@ -179,7 +179,7 @@ public:
         _is_local = (_brpc_dest_addr.hostname == localhost) &&
                     (_brpc_dest_addr.port == config::brpc_port);
         if (_is_local) {
-            LOG(INFO) << "will use local Exchange, dest_node_id is : " << 
_dest_node_id;
+            VLOG_NOTICE << "will use local Exchange, dest_node_id is : " << 
_dest_node_id;
         }
     }
 
diff --git 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/Config.groovy
 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/Config.groovy
index 10f2bed777..bf50eafdf4 100644
--- 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/Config.groovy
+++ 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/Config.groovy
@@ -375,7 +375,7 @@ class Config {
     String getDbNameByFile(File suiteFile) {
         String dir = new File(suitePath).relativePath(suiteFile.parentFile)
         // We put sql files under sql dir, so dbs and tables used by cases
-        // under sql directory should be prepared by load.groovy unbder the
+        // under sql directory should be prepared by load.groovy under the
         // parent.
         //
         // e.g.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to