This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new c75e63a2a5c [Improvement](scan) Use scanner to do projection of scan 
node (#29124)
c75e63a2a5c is described below

commit c75e63a2a5c7a1caad7e55befeb7987db8d1ef13
Author: Gabriel <gabrielleeb...@gmail.com>
AuthorDate: Wed Dec 27 16:00:52 2023 +0800

    [Improvement](scan) Use scanner to do projection of scan node (#29124)
---
 be/src/exec/exec_node.h                        |  2 +-
 be/src/pipeline/exec/scan_operator.cpp         |  8 +--
 be/src/pipeline/exec/scan_operator.h           |  4 ++
 be/src/pipeline/pipeline_x/operator.cpp        | 10 ++--
 be/src/pipeline/pipeline_x/operator.h          | 10 +++-
 be/src/pipeline/pipeline_x/pipeline_x_task.cpp |  2 +-
 be/src/vec/exec/scan/pip_scanner_context.h     | 11 +++--
 be/src/vec/exec/scan/scanner_context.cpp       | 16 +++++-
 be/src/vec/exec/scan/scanner_context.h         |  3 ++
 be/src/vec/exec/scan/scanner_scheduler.cpp     |  2 +-
 be/src/vec/exec/scan/vscan_node.cpp            |  7 +--
 be/src/vec/exec/scan/vscan_node.h              | 14 ++++++
 be/src/vec/exec/scan/vscanner.cpp              | 68 +++++++++++++++++++++++++-
 be/src/vec/exec/scan/vscanner.h                |  6 +++
 14 files changed, 138 insertions(+), 25 deletions(-)

diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h
index eeed37907f9..69f7771767f 100644
--- a/be/src/exec/exec_node.h
+++ b/be/src/exec/exec_node.h
@@ -103,7 +103,7 @@ public:
     // TODO: AggregationNode and HashJoinNode cannot be "re-opened" yet.
     [[nodiscard]] virtual Status get_next(RuntimeState* state, 
vectorized::Block* block, bool* eos);
     // new interface to compatible new optimizers in FE
-    [[nodiscard]] Status get_next_after_projects(
+    [[nodiscard]] virtual Status get_next_after_projects(
             RuntimeState* state, vectorized::Block* block, bool* eos,
             const std::function<Status(RuntimeState*, vectorized::Block*, 
bool*)>& fn,
             bool clear_data = true);
diff --git a/be/src/pipeline/exec/scan_operator.cpp 
b/be/src/pipeline/exec/scan_operator.cpp
index af9529798a3..0c0ccd42410 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -1239,10 +1239,10 @@ template <typename Derived>
 Status ScanLocalState<Derived>::_start_scanners(
         const std::list<vectorized::VScannerSPtr>& scanners) {
     auto& p = _parent->cast<typename Derived::Parent>();
-    _scanner_ctx = PipScannerContext::create_shared(state(), this, 
p._output_tuple_desc, scanners,
-                                                    p.limit(), 
state()->scan_queue_mem_limit(),
-                                                    p._col_distribute_ids, 1, 
_scan_dependency,
-                                                    _finish_dependency);
+    _scanner_ctx = PipScannerContext::create_shared(
+            state(), this, p._output_tuple_desc, p.output_row_descriptor(), 
scanners, p.limit(),
+            state()->scan_queue_mem_limit(), p._col_distribute_ids, 1, 
_scan_dependency,
+            _finish_dependency);
     return Status::OK();
 }
 
diff --git a/be/src/pipeline/exec/scan_operator.h 
b/be/src/pipeline/exec/scan_operator.h
index afbcaea0a63..833ca55f095 100644
--- a/be/src/pipeline/exec/scan_operator.h
+++ b/be/src/pipeline/exec/scan_operator.h
@@ -427,6 +427,10 @@ public:
     Status open(RuntimeState* state) override;
     Status get_block(RuntimeState* state, vectorized::Block* block,
                      SourceState& source_state) override;
+    Status get_block_after_projects(RuntimeState* state, vectorized::Block* 
block,
+                                    SourceState& source_state) override {
+        return get_block(state, block, source_state);
+    }
     [[nodiscard]] bool is_source() const override { return true; }
 
     const std::vector<TRuntimeFilterDesc>& runtime_filter_descs() override {
diff --git a/be/src/pipeline/pipeline_x/operator.cpp 
b/be/src/pipeline/pipeline_x/operator.cpp
index b90c9551426..e763a6eee1d 100644
--- a/be/src/pipeline/pipeline_x/operator.cpp
+++ b/be/src/pipeline/pipeline_x/operator.cpp
@@ -193,8 +193,8 @@ Status OperatorXBase::do_projections(RuntimeState* state, 
vectorized::Block* ori
     return Status::OK();
 }
 
-Status OperatorXBase::get_next_after_projects(RuntimeState* state, 
vectorized::Block* block,
-                                              SourceState& source_state) {
+Status OperatorXBase::get_block_after_projects(RuntimeState* state, 
vectorized::Block* block,
+                                               SourceState& source_state) {
     auto local_state = state->get_local_state(operator_id());
     if (_output_row_descriptor) {
         local_state->clear_origin_block();
@@ -461,8 +461,8 @@ Status 
PipelineXSinkLocalState<DependencyType>::close(RuntimeState* state, Statu
 template <typename LocalStateType>
 Status StreamingOperatorX<LocalStateType>::get_block(RuntimeState* state, 
vectorized::Block* block,
                                                      SourceState& 
source_state) {
-    
RETURN_IF_ERROR(OperatorX<LocalStateType>::_child_x->get_next_after_projects(state,
 block,
-                                                                               
  source_state));
+    
RETURN_IF_ERROR(OperatorX<LocalStateType>::_child_x->get_block_after_projects(state,
 block,
+                                                                               
   source_state));
     return pull(state, block, source_state);
 }
 
@@ -473,7 +473,7 @@ Status 
StatefulOperatorX<LocalStateType>::get_block(RuntimeState* state, vectori
                                 ->template cast<LocalStateType>();
     if (need_more_input_data(state)) {
         local_state._child_block->clear_column_data();
-        
RETURN_IF_ERROR(OperatorX<LocalStateType>::_child_x->get_next_after_projects(
+        
RETURN_IF_ERROR(OperatorX<LocalStateType>::_child_x->get_block_after_projects(
                 state, local_state._child_block.get(), 
local_state._child_source_state));
         source_state = local_state._child_source_state;
         if (local_state._child_block->rows() == 0 &&
diff --git a/be/src/pipeline/pipeline_x/operator.h 
b/be/src/pipeline/pipeline_x/operator.h
index c4e0a7e94a0..ec2500acf33 100644
--- a/be/src/pipeline/pipeline_x/operator.h
+++ b/be/src/pipeline/pipeline_x/operator.h
@@ -271,10 +271,15 @@ public:
         return _output_row_descriptor ? *_output_row_descriptor : 
_row_descriptor;
     }
 
+    [[nodiscard]] const RowDescriptor* output_row_descriptor() {
+        return _output_row_descriptor.get();
+    }
+
     [[nodiscard]] bool is_source() const override { return false; }
 
-    Status get_next_after_projects(RuntimeState* state, vectorized::Block* 
block,
-                                   SourceState& source_state);
+    [[nodiscard]] virtual Status get_block_after_projects(RuntimeState* state,
+                                                          vectorized::Block* 
block,
+                                                          SourceState& 
source_state);
 
     /// Only use in vectorized exec engine try to do projections to trans 
_row_desc -> _output_row_desc
     Status do_projections(RuntimeState* state, vectorized::Block* origin_block,
@@ -286,6 +291,7 @@ protected:
     template <typename Dependency>
     friend class PipelineXLocalState;
     friend class PipelineXLocalStateBase;
+    friend class VScanner;
     const int _operator_id;
     const int _node_id; // unique w/in single plan tree
     TPlanNodeType::type _type;
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
index be62fcac213..1a0c0749e8b 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
@@ -272,7 +272,7 @@ Status PipelineXTask::execute(bool* eos) {
         if (!_dry_run) {
             SCOPED_TIMER(_get_block_timer);
             _get_block_counter->update(1);
-            RETURN_IF_ERROR(_root->get_next_after_projects(_state, block, 
_data_state));
+            RETURN_IF_ERROR(_root->get_block_after_projects(_state, block, 
_data_state));
         } else {
             _data_state = SourceState::FINISHED;
         }
diff --git a/be/src/vec/exec/scan/pip_scanner_context.h 
b/be/src/vec/exec/scan/pip_scanner_context.h
index 42726cb17fe..fbf59fffab2 100644
--- a/be/src/vec/exec/scan/pip_scanner_context.h
+++ b/be/src/vec/exec/scan/pip_scanner_context.h
@@ -31,23 +31,26 @@ class PipScannerContext : public vectorized::ScannerContext 
{
 public:
     PipScannerContext(RuntimeState* state, vectorized::VScanNode* parent,
                       const TupleDescriptor* output_tuple_desc,
+                      const RowDescriptor* output_row_descriptor,
                       const std::list<vectorized::VScannerSPtr>& scanners, 
int64_t limit_,
                       int64_t max_bytes_in_blocks_queue, const 
std::vector<int>& col_distribute_ids,
                       const int num_parallel_instances)
-            : vectorized::ScannerContext(state, parent, output_tuple_desc, 
scanners, limit_,
-                                         max_bytes_in_blocks_queue, 
num_parallel_instances),
+            : vectorized::ScannerContext(state, parent, output_tuple_desc, 
output_row_descriptor,
+                                         scanners, limit_, 
max_bytes_in_blocks_queue,
+                                         num_parallel_instances),
               _col_distribute_ids(col_distribute_ids),
               _need_colocate_distribute(!_col_distribute_ids.empty()) {}
 
     PipScannerContext(RuntimeState* state, ScanLocalStateBase* local_state,
                       const TupleDescriptor* output_tuple_desc,
+                      const RowDescriptor* output_row_descriptor,
                       const std::list<vectorized::VScannerSPtr>& scanners, 
int64_t limit_,
                       int64_t max_bytes_in_blocks_queue, const 
std::vector<int>& col_distribute_ids,
                       const int num_parallel_instances,
                       std::shared_ptr<pipeline::ScanDependency> dependency,
                       std::shared_ptr<pipeline::Dependency> finish_dependency)
-            : vectorized::ScannerContext(state, output_tuple_desc, scanners, 
limit_,
-                                         max_bytes_in_blocks_queue, 
num_parallel_instances,
+            : vectorized::ScannerContext(state, output_tuple_desc, 
output_row_descriptor, scanners,
+                                         limit_, max_bytes_in_blocks_queue, 
num_parallel_instances,
                                          local_state, dependency, 
finish_dependency),
               _need_colocate_distribute(false) {}
 
diff --git a/be/src/vec/exec/scan/scanner_context.cpp 
b/be/src/vec/exec/scan/scanner_context.cpp
index 4d3820d7634..954c294574f 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -46,6 +46,7 @@ namespace doris::vectorized {
 using namespace std::chrono_literals;
 
 ScannerContext::ScannerContext(RuntimeState* state, const TupleDescriptor* 
output_tuple_desc,
+                               const RowDescriptor* output_row_descriptor,
                                const std::list<VScannerSPtr>& scanners, 
int64_t limit_,
                                int64_t max_bytes_in_blocks_queue, const int 
num_parallel_instances,
                                pipeline::ScanLocalStateBase* local_state,
@@ -54,7 +55,10 @@ ScannerContext::ScannerContext(RuntimeState* state, const 
TupleDescriptor* outpu
         : _state(state),
           _parent(nullptr),
           _local_state(local_state),
-          _output_tuple_desc(output_tuple_desc),
+          _output_tuple_desc(output_row_descriptor
+                                     ? 
output_row_descriptor->tuple_descriptors().front()
+                                     : output_tuple_desc),
+          _output_row_descriptor(output_row_descriptor),
           _process_status(Status::OK()),
           _batch_size(state->batch_size()),
           limit(limit_),
@@ -66,6 +70,8 @@ ScannerContext::ScannerContext(RuntimeState* state, const 
TupleDescriptor* outpu
           _num_parallel_instances(num_parallel_instances),
           _dependency(dependency),
           _finish_dependency(finish_dependency) {
+    DCHECK(_output_row_descriptor == nullptr ||
+           _output_row_descriptor->tuple_descriptors().size() == 1);
     // Use the task exec context as a lock between scanner threads and 
fragment exection threads
     _task_exec_ctx = _state->get_task_execution_context();
     _query_id = _state->get_query_ctx()->query_id();
@@ -92,13 +98,17 @@ ScannerContext::ScannerContext(RuntimeState* state, const 
TupleDescriptor* outpu
 
 ScannerContext::ScannerContext(doris::RuntimeState* state, 
doris::vectorized::VScanNode* parent,
                                const doris::TupleDescriptor* output_tuple_desc,
+                               const RowDescriptor* output_row_descriptor,
                                const std::list<VScannerSPtr>& scanners, 
int64_t limit_,
                                int64_t max_bytes_in_blocks_queue, const int 
num_parallel_instances,
                                pipeline::ScanLocalStateBase* local_state)
         : _state(state),
           _parent(parent),
           _local_state(local_state),
-          _output_tuple_desc(output_tuple_desc),
+          _output_tuple_desc(output_row_descriptor
+                                     ? 
output_row_descriptor->tuple_descriptors().front()
+                                     : output_tuple_desc),
+          _output_row_descriptor(output_row_descriptor),
           _process_status(Status::OK()),
           _batch_size(state->batch_size()),
           limit(limit_),
@@ -108,6 +118,8 @@ ScannerContext::ScannerContext(doris::RuntimeState* state, 
doris::vectorized::VS
           _scanners(scanners),
           _scanners_ref(scanners.begin(), scanners.end()),
           _num_parallel_instances(num_parallel_instances) {
+    DCHECK(_output_row_descriptor == nullptr ||
+           _output_row_descriptor->tuple_descriptors().size() == 1);
     // Use the task exec context as a lock between scanner threads and 
fragment exection threads
     _task_exec_ctx = _state->get_task_execution_context();
     _query_id = _state->get_query_ctx()->query_id();
diff --git a/be/src/vec/exec/scan/scanner_context.h 
b/be/src/vec/exec/scan/scanner_context.h
index ba9c1fdee10..6a3e8553f8f 100644
--- a/be/src/vec/exec/scan/scanner_context.h
+++ b/be/src/vec/exec/scan/scanner_context.h
@@ -70,6 +70,7 @@ class ScannerContext : public 
std::enable_shared_from_this<ScannerContext> {
 
 public:
     ScannerContext(RuntimeState* state, VScanNode* parent, const 
TupleDescriptor* output_tuple_desc,
+                   const RowDescriptor* output_row_descriptor,
                    const std::list<VScannerSPtr>& scanners, int64_t limit_,
                    int64_t max_bytes_in_blocks_queue, const int 
num_parallel_instances = 1,
                    pipeline::ScanLocalStateBase* local_state = nullptr);
@@ -187,6 +188,7 @@ private:
 
 protected:
     ScannerContext(RuntimeState* state_, const TupleDescriptor* 
output_tuple_desc,
+                   const RowDescriptor* output_row_descriptor,
                    const std::list<VScannerSPtr>& scanners_, int64_t limit_,
                    int64_t max_bytes_in_blocks_queue_, const int 
num_parallel_instances,
                    pipeline::ScanLocalStateBase* local_state,
@@ -203,6 +205,7 @@ protected:
 
     // the comment of same fields in VScanNode
     const TupleDescriptor* _output_tuple_desc = nullptr;
+    const RowDescriptor* _output_row_descriptor = nullptr;
 
     // _transfer_lock is used to protect the critical section
     // where the ScanNode and ScannerScheduler interact.
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp 
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index 84f56813c34..6ec83e8bd6a 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -351,7 +351,7 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* 
scheduler,
 
         BlockUPtr block = ctx->get_free_block();
 
-        status = scanner->get_block(state, block.get(), &eos);
+        status = scanner->get_block_after_projects(state, block.get(), &eos);
         // The VFileScanner for external table may try to open not exist files,
         // Because FE file cache for external table may out of date.
         // So, NOT_FOUND for VFileScanner is not a fail case.
diff --git a/be/src/vec/exec/scan/vscan_node.cpp 
b/be/src/vec/exec/scan/vscan_node.cpp
index 5176d7900b3..c45468e0514 100644
--- a/be/src/vec/exec/scan/vscan_node.cpp
+++ b/be/src/vec/exec/scan/vscan_node.cpp
@@ -323,10 +323,11 @@ Status VScanNode::_start_scanners(const 
std::list<VScannerSPtr>& scanners,
     if (_is_pipeline_scan) {
         int max_queue_size = _shared_scan_opt ? 
std::max(query_parallel_instance_num, 1) : 1;
         _scanner_ctx = pipeline::PipScannerContext::create_shared(
-                _state, this, _output_tuple_desc, scanners, limit(), 
_state->scan_queue_mem_limit(),
-                _col_distribute_ids, max_queue_size);
+                _state, this, _output_tuple_desc, 
_output_row_descriptor.get(), scanners, limit(),
+                _state->scan_queue_mem_limit(), _col_distribute_ids, 
max_queue_size);
     } else {
-        _scanner_ctx = ScannerContext::create_shared(_state, this, 
_output_tuple_desc, scanners,
+        _scanner_ctx = ScannerContext::create_shared(_state, this, 
_output_tuple_desc,
+                                                     
_output_row_descriptor.get(), scanners,
                                                      limit(), 
_state->scan_queue_mem_limit());
     }
     return Status::OK();
diff --git a/be/src/vec/exec/scan/vscan_node.h 
b/be/src/vec/exec/scan/vscan_node.h
index 5917d0ff46b..7cd8b15991e 100644
--- a/be/src/vec/exec/scan/vscan_node.h
+++ b/be/src/vec/exec/scan/vscan_node.h
@@ -177,6 +177,20 @@ public:
 
     RuntimeProfile* scanner_profile() { return _scanner_profile.get(); }
 
+    Status get_next_after_projects(
+            RuntimeState* state, vectorized::Block* block, bool* eos,
+            const std::function<Status(RuntimeState*, vectorized::Block*, 
bool*)>& fn,
+            bool clear_data = true) override {
+        Defer defer([block, this]() {
+            if (block && !block->empty()) {
+                COUNTER_UPDATE(_output_bytes_counter, 
block->allocated_bytes());
+                COUNTER_UPDATE(_block_count_counter, 1);
+            }
+        });
+        _peak_memory_usage_counter->set(_mem_tracker->peak_consumption());
+        return get_next(state, block, eos);
+    }
+
 protected:
     // Different data sources register different profiles by implementing this 
method
     virtual Status _init_profile();
diff --git a/be/src/vec/exec/scan/vscanner.cpp 
b/be/src/vec/exec/scan/vscanner.cpp
index a7c0c3c4062..0a7a8c9c019 100644
--- a/be/src/vec/exec/scan/vscanner.cpp
+++ b/be/src/vec/exec/scan/vscanner.cpp
@@ -35,7 +35,8 @@ VScanner::VScanner(RuntimeState* state, VScanNode* parent, 
int64_t limit, Runtim
           _local_state(nullptr),
           _limit(limit),
           _profile(profile),
-          _output_tuple_desc(parent->output_tuple_desc()) {
+          _output_tuple_desc(parent->output_tuple_desc()),
+          _output_row_descriptor(_parent->_output_row_descriptor.get()) {
     _total_rf_num = _parent->runtime_filter_num();
 }
 
@@ -46,7 +47,8 @@ VScanner::VScanner(RuntimeState* state, 
pipeline::ScanLocalStateBase* local_stat
           _local_state(local_state),
           _limit(limit),
           _profile(profile),
-          _output_tuple_desc(_local_state->output_tuple_desc()) {
+          _output_tuple_desc(_local_state->output_tuple_desc()),
+          
_output_row_descriptor(_local_state->_parent->output_row_descriptor()) {
     _total_rf_num = _local_state->runtime_filter_num();
 }
 
@@ -58,9 +60,30 @@ Status VScanner::prepare(RuntimeState* state, const 
VExprContextSPtrs& conjuncts
         }
     }
 
+    const auto& projections = _parent ? _parent->_projections : 
_local_state->_projections;
+    if (!projections.empty()) {
+        _projections.resize(projections.size());
+        for (size_t i = 0; i != projections.size(); ++i) {
+            RETURN_IF_ERROR(projections[i]->clone(state, _projections[i]));
+        }
+    }
+
     return Status::OK();
 }
 
+Status VScanner::get_block_after_projects(RuntimeState* state, 
vectorized::Block* block,
+                                          bool* eos) {
+    auto& row_descriptor =
+            _parent ? _parent->_row_descriptor : 
_local_state->_parent->row_descriptor();
+    if (_output_row_descriptor) {
+        
_origin_block.clear_column_data(row_descriptor.num_materialized_slots());
+        auto status = get_block(state, &_origin_block, eos);
+        if (UNLIKELY(!status.ok())) return status;
+        return _do_projections(&_origin_block, block);
+    }
+    return get_block(state, block, eos);
+}
+
 Status VScanner::get_block(RuntimeState* state, Block* block, bool* eof) {
     // only empty block should be here
     DCHECK(block->rows() == 0);
@@ -138,6 +161,47 @@ Status VScanner::_filter_output_block(Block* block) {
     return st;
 }
 
+Status VScanner::_do_projections(vectorized::Block* origin_block, 
vectorized::Block* output_block) {
+    auto projection_timer = _parent ? _parent->_projection_timer : 
_local_state->_projection_timer;
+    auto exec_timer = _parent ? _parent->_exec_timer : 
_local_state->_exec_timer;
+    SCOPED_TIMER(exec_timer);
+    SCOPED_TIMER(projection_timer);
+
+    MutableBlock mutable_block =
+            VectorizedUtils::build_mutable_mem_reuse_block(output_block, 
*_output_row_descriptor);
+    auto rows = origin_block->rows();
+
+    if (rows != 0) {
+        auto& mutable_columns = mutable_block.mutable_columns();
+
+        if (mutable_columns.size() != _projections.size()) {
+            return Status::InternalError(
+                    "Logical error in scanner, output of projections {} 
mismatches with "
+                    "scanner output {}",
+                    _projections.size(), mutable_columns.size());
+        }
+
+        for (int i = 0; i < mutable_columns.size(); ++i) {
+            auto result_column_id = -1;
+            RETURN_IF_ERROR(_projections[i]->execute(origin_block, 
&result_column_id));
+            auto column_ptr = origin_block->get_by_position(result_column_id)
+                                      
.column->convert_to_full_column_if_const();
+            //TODO: this is a quick fix, we need a new function like 
"change_to_nullable" to do it
+            if (mutable_columns[i]->is_nullable() xor 
column_ptr->is_nullable()) {
+                DCHECK(mutable_columns[i]->is_nullable() && 
!column_ptr->is_nullable());
+                reinterpret_cast<ColumnNullable*>(mutable_columns[i].get())
+                        ->insert_range_from_not_nullable(*column_ptr, 0, rows);
+            } else {
+                mutable_columns[i]->insert_range_from(*column_ptr, 0, rows);
+            }
+        }
+        DCHECK(mutable_block.rows() == rows);
+        output_block->set_columns(std::move(mutable_columns));
+    }
+
+    return Status::OK();
+}
+
 Status VScanner::try_append_late_arrival_runtime_filter() {
     if (_applied_rf_num == _total_rf_num) {
         return Status::OK();
diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h
index 29daf9a68c5..b00c3f348f4 100644
--- a/be/src/vec/exec/scan/vscanner.h
+++ b/be/src/vec/exec/scan/vscanner.h
@@ -67,6 +67,7 @@ public:
     virtual Status open(RuntimeState* state) { return Status::OK(); }
 
     Status get_block(RuntimeState* state, Block* block, bool* eos);
+    Status get_block_after_projects(RuntimeState* state, vectorized::Block* 
block, bool* eos);
 
     virtual Status close(RuntimeState* state);
 
@@ -89,6 +90,8 @@ protected:
     // Filter the output block finally.
     Status _filter_output_block(Block* block);
 
+    Status _do_projections(vectorized::Block* origin_block, vectorized::Block* 
output_block);
+
     // Not virtual, all child will call this method explictly
     Status prepare(RuntimeState* state, const VExprContextSPtrs& conjuncts);
 
@@ -172,6 +175,7 @@ protected:
     RuntimeProfile* _profile = nullptr;
 
     const TupleDescriptor* _output_tuple_desc = nullptr;
+    const RowDescriptor* _output_row_descriptor = nullptr;
 
     // If _input_tuple_desc is set, the scanner will read data into
     // this _input_block first, then convert to the output block.
@@ -189,6 +193,8 @@ protected:
     // Cloned from _conjuncts of scan node.
     // It includes predicate in SQL and runtime filters.
     VExprContextSPtrs _conjuncts;
+    VExprContextSPtrs _projections;
+    vectorized::Block _origin_block;
 
     VExprContextSPtrs _common_expr_ctxs_push_down;
     // Late arriving runtime filters will update _conjuncts.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to