This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new c75e63a2a5c [Improvement](scan) Use scanner to do projection of scan node (#29124) c75e63a2a5c is described below commit c75e63a2a5c7a1caad7e55befeb7987db8d1ef13 Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Wed Dec 27 16:00:52 2023 +0800 [Improvement](scan) Use scanner to do projection of scan node (#29124) --- be/src/exec/exec_node.h | 2 +- be/src/pipeline/exec/scan_operator.cpp | 8 +-- be/src/pipeline/exec/scan_operator.h | 4 ++ be/src/pipeline/pipeline_x/operator.cpp | 10 ++-- be/src/pipeline/pipeline_x/operator.h | 10 +++- be/src/pipeline/pipeline_x/pipeline_x_task.cpp | 2 +- be/src/vec/exec/scan/pip_scanner_context.h | 11 +++-- be/src/vec/exec/scan/scanner_context.cpp | 16 +++++- be/src/vec/exec/scan/scanner_context.h | 3 ++ be/src/vec/exec/scan/scanner_scheduler.cpp | 2 +- be/src/vec/exec/scan/vscan_node.cpp | 7 +-- be/src/vec/exec/scan/vscan_node.h | 14 ++++++ be/src/vec/exec/scan/vscanner.cpp | 68 +++++++++++++++++++++++++- be/src/vec/exec/scan/vscanner.h | 6 +++ 14 files changed, 138 insertions(+), 25 deletions(-) diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h index eeed37907f9..69f7771767f 100644 --- a/be/src/exec/exec_node.h +++ b/be/src/exec/exec_node.h @@ -103,7 +103,7 @@ public: // TODO: AggregationNode and HashJoinNode cannot be "re-opened" yet. [[nodiscard]] virtual Status get_next(RuntimeState* state, vectorized::Block* block, bool* eos); // new interface to compatible new optimizers in FE - [[nodiscard]] Status get_next_after_projects( + [[nodiscard]] virtual Status get_next_after_projects( RuntimeState* state, vectorized::Block* block, bool* eos, const std::function<Status(RuntimeState*, vectorized::Block*, bool*)>& fn, bool clear_data = true); diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index af9529798a3..0c0ccd42410 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -1239,10 +1239,10 @@ template <typename Derived> Status ScanLocalState<Derived>::_start_scanners( const std::list<vectorized::VScannerSPtr>& scanners) { auto& p = _parent->cast<typename Derived::Parent>(); - _scanner_ctx = PipScannerContext::create_shared(state(), this, p._output_tuple_desc, scanners, - p.limit(), state()->scan_queue_mem_limit(), - p._col_distribute_ids, 1, _scan_dependency, - _finish_dependency); + _scanner_ctx = PipScannerContext::create_shared( + state(), this, p._output_tuple_desc, p.output_row_descriptor(), scanners, p.limit(), + state()->scan_queue_mem_limit(), p._col_distribute_ids, 1, _scan_dependency, + _finish_dependency); return Status::OK(); } diff --git a/be/src/pipeline/exec/scan_operator.h b/be/src/pipeline/exec/scan_operator.h index afbcaea0a63..833ca55f095 100644 --- a/be/src/pipeline/exec/scan_operator.h +++ b/be/src/pipeline/exec/scan_operator.h @@ -427,6 +427,10 @@ public: Status open(RuntimeState* state) override; Status get_block(RuntimeState* state, vectorized::Block* block, SourceState& source_state) override; + Status get_block_after_projects(RuntimeState* state, vectorized::Block* block, + SourceState& source_state) override { + return get_block(state, block, source_state); + } [[nodiscard]] bool is_source() const override { return true; } const std::vector<TRuntimeFilterDesc>& runtime_filter_descs() override { diff --git a/be/src/pipeline/pipeline_x/operator.cpp b/be/src/pipeline/pipeline_x/operator.cpp index b90c9551426..e763a6eee1d 100644 --- a/be/src/pipeline/pipeline_x/operator.cpp +++ b/be/src/pipeline/pipeline_x/operator.cpp @@ -193,8 +193,8 @@ Status OperatorXBase::do_projections(RuntimeState* state, vectorized::Block* ori return Status::OK(); } -Status OperatorXBase::get_next_after_projects(RuntimeState* state, vectorized::Block* block, - SourceState& source_state) { +Status OperatorXBase::get_block_after_projects(RuntimeState* state, vectorized::Block* block, + SourceState& source_state) { auto local_state = state->get_local_state(operator_id()); if (_output_row_descriptor) { local_state->clear_origin_block(); @@ -461,8 +461,8 @@ Status PipelineXSinkLocalState<DependencyType>::close(RuntimeState* state, Statu template <typename LocalStateType> Status StreamingOperatorX<LocalStateType>::get_block(RuntimeState* state, vectorized::Block* block, SourceState& source_state) { - RETURN_IF_ERROR(OperatorX<LocalStateType>::_child_x->get_next_after_projects(state, block, - source_state)); + RETURN_IF_ERROR(OperatorX<LocalStateType>::_child_x->get_block_after_projects(state, block, + source_state)); return pull(state, block, source_state); } @@ -473,7 +473,7 @@ Status StatefulOperatorX<LocalStateType>::get_block(RuntimeState* state, vectori ->template cast<LocalStateType>(); if (need_more_input_data(state)) { local_state._child_block->clear_column_data(); - RETURN_IF_ERROR(OperatorX<LocalStateType>::_child_x->get_next_after_projects( + RETURN_IF_ERROR(OperatorX<LocalStateType>::_child_x->get_block_after_projects( state, local_state._child_block.get(), local_state._child_source_state)); source_state = local_state._child_source_state; if (local_state._child_block->rows() == 0 && diff --git a/be/src/pipeline/pipeline_x/operator.h b/be/src/pipeline/pipeline_x/operator.h index c4e0a7e94a0..ec2500acf33 100644 --- a/be/src/pipeline/pipeline_x/operator.h +++ b/be/src/pipeline/pipeline_x/operator.h @@ -271,10 +271,15 @@ public: return _output_row_descriptor ? *_output_row_descriptor : _row_descriptor; } + [[nodiscard]] const RowDescriptor* output_row_descriptor() { + return _output_row_descriptor.get(); + } + [[nodiscard]] bool is_source() const override { return false; } - Status get_next_after_projects(RuntimeState* state, vectorized::Block* block, - SourceState& source_state); + [[nodiscard]] virtual Status get_block_after_projects(RuntimeState* state, + vectorized::Block* block, + SourceState& source_state); /// Only use in vectorized exec engine try to do projections to trans _row_desc -> _output_row_desc Status do_projections(RuntimeState* state, vectorized::Block* origin_block, @@ -286,6 +291,7 @@ protected: template <typename Dependency> friend class PipelineXLocalState; friend class PipelineXLocalStateBase; + friend class VScanner; const int _operator_id; const int _node_id; // unique w/in single plan tree TPlanNodeType::type _type; diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp index be62fcac213..1a0c0749e8b 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp @@ -272,7 +272,7 @@ Status PipelineXTask::execute(bool* eos) { if (!_dry_run) { SCOPED_TIMER(_get_block_timer); _get_block_counter->update(1); - RETURN_IF_ERROR(_root->get_next_after_projects(_state, block, _data_state)); + RETURN_IF_ERROR(_root->get_block_after_projects(_state, block, _data_state)); } else { _data_state = SourceState::FINISHED; } diff --git a/be/src/vec/exec/scan/pip_scanner_context.h b/be/src/vec/exec/scan/pip_scanner_context.h index 42726cb17fe..fbf59fffab2 100644 --- a/be/src/vec/exec/scan/pip_scanner_context.h +++ b/be/src/vec/exec/scan/pip_scanner_context.h @@ -31,23 +31,26 @@ class PipScannerContext : public vectorized::ScannerContext { public: PipScannerContext(RuntimeState* state, vectorized::VScanNode* parent, const TupleDescriptor* output_tuple_desc, + const RowDescriptor* output_row_descriptor, const std::list<vectorized::VScannerSPtr>& scanners, int64_t limit_, int64_t max_bytes_in_blocks_queue, const std::vector<int>& col_distribute_ids, const int num_parallel_instances) - : vectorized::ScannerContext(state, parent, output_tuple_desc, scanners, limit_, - max_bytes_in_blocks_queue, num_parallel_instances), + : vectorized::ScannerContext(state, parent, output_tuple_desc, output_row_descriptor, + scanners, limit_, max_bytes_in_blocks_queue, + num_parallel_instances), _col_distribute_ids(col_distribute_ids), _need_colocate_distribute(!_col_distribute_ids.empty()) {} PipScannerContext(RuntimeState* state, ScanLocalStateBase* local_state, const TupleDescriptor* output_tuple_desc, + const RowDescriptor* output_row_descriptor, const std::list<vectorized::VScannerSPtr>& scanners, int64_t limit_, int64_t max_bytes_in_blocks_queue, const std::vector<int>& col_distribute_ids, const int num_parallel_instances, std::shared_ptr<pipeline::ScanDependency> dependency, std::shared_ptr<pipeline::Dependency> finish_dependency) - : vectorized::ScannerContext(state, output_tuple_desc, scanners, limit_, - max_bytes_in_blocks_queue, num_parallel_instances, + : vectorized::ScannerContext(state, output_tuple_desc, output_row_descriptor, scanners, + limit_, max_bytes_in_blocks_queue, num_parallel_instances, local_state, dependency, finish_dependency), _need_colocate_distribute(false) {} diff --git a/be/src/vec/exec/scan/scanner_context.cpp b/be/src/vec/exec/scan/scanner_context.cpp index 4d3820d7634..954c294574f 100644 --- a/be/src/vec/exec/scan/scanner_context.cpp +++ b/be/src/vec/exec/scan/scanner_context.cpp @@ -46,6 +46,7 @@ namespace doris::vectorized { using namespace std::chrono_literals; ScannerContext::ScannerContext(RuntimeState* state, const TupleDescriptor* output_tuple_desc, + const RowDescriptor* output_row_descriptor, const std::list<VScannerSPtr>& scanners, int64_t limit_, int64_t max_bytes_in_blocks_queue, const int num_parallel_instances, pipeline::ScanLocalStateBase* local_state, @@ -54,7 +55,10 @@ ScannerContext::ScannerContext(RuntimeState* state, const TupleDescriptor* outpu : _state(state), _parent(nullptr), _local_state(local_state), - _output_tuple_desc(output_tuple_desc), + _output_tuple_desc(output_row_descriptor + ? output_row_descriptor->tuple_descriptors().front() + : output_tuple_desc), + _output_row_descriptor(output_row_descriptor), _process_status(Status::OK()), _batch_size(state->batch_size()), limit(limit_), @@ -66,6 +70,8 @@ ScannerContext::ScannerContext(RuntimeState* state, const TupleDescriptor* outpu _num_parallel_instances(num_parallel_instances), _dependency(dependency), _finish_dependency(finish_dependency) { + DCHECK(_output_row_descriptor == nullptr || + _output_row_descriptor->tuple_descriptors().size() == 1); // Use the task exec context as a lock between scanner threads and fragment exection threads _task_exec_ctx = _state->get_task_execution_context(); _query_id = _state->get_query_ctx()->query_id(); @@ -92,13 +98,17 @@ ScannerContext::ScannerContext(RuntimeState* state, const TupleDescriptor* outpu ScannerContext::ScannerContext(doris::RuntimeState* state, doris::vectorized::VScanNode* parent, const doris::TupleDescriptor* output_tuple_desc, + const RowDescriptor* output_row_descriptor, const std::list<VScannerSPtr>& scanners, int64_t limit_, int64_t max_bytes_in_blocks_queue, const int num_parallel_instances, pipeline::ScanLocalStateBase* local_state) : _state(state), _parent(parent), _local_state(local_state), - _output_tuple_desc(output_tuple_desc), + _output_tuple_desc(output_row_descriptor + ? output_row_descriptor->tuple_descriptors().front() + : output_tuple_desc), + _output_row_descriptor(output_row_descriptor), _process_status(Status::OK()), _batch_size(state->batch_size()), limit(limit_), @@ -108,6 +118,8 @@ ScannerContext::ScannerContext(doris::RuntimeState* state, doris::vectorized::VS _scanners(scanners), _scanners_ref(scanners.begin(), scanners.end()), _num_parallel_instances(num_parallel_instances) { + DCHECK(_output_row_descriptor == nullptr || + _output_row_descriptor->tuple_descriptors().size() == 1); // Use the task exec context as a lock between scanner threads and fragment exection threads _task_exec_ctx = _state->get_task_execution_context(); _query_id = _state->get_query_ctx()->query_id(); diff --git a/be/src/vec/exec/scan/scanner_context.h b/be/src/vec/exec/scan/scanner_context.h index ba9c1fdee10..6a3e8553f8f 100644 --- a/be/src/vec/exec/scan/scanner_context.h +++ b/be/src/vec/exec/scan/scanner_context.h @@ -70,6 +70,7 @@ class ScannerContext : public std::enable_shared_from_this<ScannerContext> { public: ScannerContext(RuntimeState* state, VScanNode* parent, const TupleDescriptor* output_tuple_desc, + const RowDescriptor* output_row_descriptor, const std::list<VScannerSPtr>& scanners, int64_t limit_, int64_t max_bytes_in_blocks_queue, const int num_parallel_instances = 1, pipeline::ScanLocalStateBase* local_state = nullptr); @@ -187,6 +188,7 @@ private: protected: ScannerContext(RuntimeState* state_, const TupleDescriptor* output_tuple_desc, + const RowDescriptor* output_row_descriptor, const std::list<VScannerSPtr>& scanners_, int64_t limit_, int64_t max_bytes_in_blocks_queue_, const int num_parallel_instances, pipeline::ScanLocalStateBase* local_state, @@ -203,6 +205,7 @@ protected: // the comment of same fields in VScanNode const TupleDescriptor* _output_tuple_desc = nullptr; + const RowDescriptor* _output_row_descriptor = nullptr; // _transfer_lock is used to protect the critical section // where the ScanNode and ScannerScheduler interact. diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index 84f56813c34..6ec83e8bd6a 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -351,7 +351,7 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, BlockUPtr block = ctx->get_free_block(); - status = scanner->get_block(state, block.get(), &eos); + status = scanner->get_block_after_projects(state, block.get(), &eos); // The VFileScanner for external table may try to open not exist files, // Because FE file cache for external table may out of date. // So, NOT_FOUND for VFileScanner is not a fail case. diff --git a/be/src/vec/exec/scan/vscan_node.cpp b/be/src/vec/exec/scan/vscan_node.cpp index 5176d7900b3..c45468e0514 100644 --- a/be/src/vec/exec/scan/vscan_node.cpp +++ b/be/src/vec/exec/scan/vscan_node.cpp @@ -323,10 +323,11 @@ Status VScanNode::_start_scanners(const std::list<VScannerSPtr>& scanners, if (_is_pipeline_scan) { int max_queue_size = _shared_scan_opt ? std::max(query_parallel_instance_num, 1) : 1; _scanner_ctx = pipeline::PipScannerContext::create_shared( - _state, this, _output_tuple_desc, scanners, limit(), _state->scan_queue_mem_limit(), - _col_distribute_ids, max_queue_size); + _state, this, _output_tuple_desc, _output_row_descriptor.get(), scanners, limit(), + _state->scan_queue_mem_limit(), _col_distribute_ids, max_queue_size); } else { - _scanner_ctx = ScannerContext::create_shared(_state, this, _output_tuple_desc, scanners, + _scanner_ctx = ScannerContext::create_shared(_state, this, _output_tuple_desc, + _output_row_descriptor.get(), scanners, limit(), _state->scan_queue_mem_limit()); } return Status::OK(); diff --git a/be/src/vec/exec/scan/vscan_node.h b/be/src/vec/exec/scan/vscan_node.h index 5917d0ff46b..7cd8b15991e 100644 --- a/be/src/vec/exec/scan/vscan_node.h +++ b/be/src/vec/exec/scan/vscan_node.h @@ -177,6 +177,20 @@ public: RuntimeProfile* scanner_profile() { return _scanner_profile.get(); } + Status get_next_after_projects( + RuntimeState* state, vectorized::Block* block, bool* eos, + const std::function<Status(RuntimeState*, vectorized::Block*, bool*)>& fn, + bool clear_data = true) override { + Defer defer([block, this]() { + if (block && !block->empty()) { + COUNTER_UPDATE(_output_bytes_counter, block->allocated_bytes()); + COUNTER_UPDATE(_block_count_counter, 1); + } + }); + _peak_memory_usage_counter->set(_mem_tracker->peak_consumption()); + return get_next(state, block, eos); + } + protected: // Different data sources register different profiles by implementing this method virtual Status _init_profile(); diff --git a/be/src/vec/exec/scan/vscanner.cpp b/be/src/vec/exec/scan/vscanner.cpp index a7c0c3c4062..0a7a8c9c019 100644 --- a/be/src/vec/exec/scan/vscanner.cpp +++ b/be/src/vec/exec/scan/vscanner.cpp @@ -35,7 +35,8 @@ VScanner::VScanner(RuntimeState* state, VScanNode* parent, int64_t limit, Runtim _local_state(nullptr), _limit(limit), _profile(profile), - _output_tuple_desc(parent->output_tuple_desc()) { + _output_tuple_desc(parent->output_tuple_desc()), + _output_row_descriptor(_parent->_output_row_descriptor.get()) { _total_rf_num = _parent->runtime_filter_num(); } @@ -46,7 +47,8 @@ VScanner::VScanner(RuntimeState* state, pipeline::ScanLocalStateBase* local_stat _local_state(local_state), _limit(limit), _profile(profile), - _output_tuple_desc(_local_state->output_tuple_desc()) { + _output_tuple_desc(_local_state->output_tuple_desc()), + _output_row_descriptor(_local_state->_parent->output_row_descriptor()) { _total_rf_num = _local_state->runtime_filter_num(); } @@ -58,9 +60,30 @@ Status VScanner::prepare(RuntimeState* state, const VExprContextSPtrs& conjuncts } } + const auto& projections = _parent ? _parent->_projections : _local_state->_projections; + if (!projections.empty()) { + _projections.resize(projections.size()); + for (size_t i = 0; i != projections.size(); ++i) { + RETURN_IF_ERROR(projections[i]->clone(state, _projections[i])); + } + } + return Status::OK(); } +Status VScanner::get_block_after_projects(RuntimeState* state, vectorized::Block* block, + bool* eos) { + auto& row_descriptor = + _parent ? _parent->_row_descriptor : _local_state->_parent->row_descriptor(); + if (_output_row_descriptor) { + _origin_block.clear_column_data(row_descriptor.num_materialized_slots()); + auto status = get_block(state, &_origin_block, eos); + if (UNLIKELY(!status.ok())) return status; + return _do_projections(&_origin_block, block); + } + return get_block(state, block, eos); +} + Status VScanner::get_block(RuntimeState* state, Block* block, bool* eof) { // only empty block should be here DCHECK(block->rows() == 0); @@ -138,6 +161,47 @@ Status VScanner::_filter_output_block(Block* block) { return st; } +Status VScanner::_do_projections(vectorized::Block* origin_block, vectorized::Block* output_block) { + auto projection_timer = _parent ? _parent->_projection_timer : _local_state->_projection_timer; + auto exec_timer = _parent ? _parent->_exec_timer : _local_state->_exec_timer; + SCOPED_TIMER(exec_timer); + SCOPED_TIMER(projection_timer); + + MutableBlock mutable_block = + VectorizedUtils::build_mutable_mem_reuse_block(output_block, *_output_row_descriptor); + auto rows = origin_block->rows(); + + if (rows != 0) { + auto& mutable_columns = mutable_block.mutable_columns(); + + if (mutable_columns.size() != _projections.size()) { + return Status::InternalError( + "Logical error in scanner, output of projections {} mismatches with " + "scanner output {}", + _projections.size(), mutable_columns.size()); + } + + for (int i = 0; i < mutable_columns.size(); ++i) { + auto result_column_id = -1; + RETURN_IF_ERROR(_projections[i]->execute(origin_block, &result_column_id)); + auto column_ptr = origin_block->get_by_position(result_column_id) + .column->convert_to_full_column_if_const(); + //TODO: this is a quick fix, we need a new function like "change_to_nullable" to do it + if (mutable_columns[i]->is_nullable() xor column_ptr->is_nullable()) { + DCHECK(mutable_columns[i]->is_nullable() && !column_ptr->is_nullable()); + reinterpret_cast<ColumnNullable*>(mutable_columns[i].get()) + ->insert_range_from_not_nullable(*column_ptr, 0, rows); + } else { + mutable_columns[i]->insert_range_from(*column_ptr, 0, rows); + } + } + DCHECK(mutable_block.rows() == rows); + output_block->set_columns(std::move(mutable_columns)); + } + + return Status::OK(); +} + Status VScanner::try_append_late_arrival_runtime_filter() { if (_applied_rf_num == _total_rf_num) { return Status::OK(); diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h index 29daf9a68c5..b00c3f348f4 100644 --- a/be/src/vec/exec/scan/vscanner.h +++ b/be/src/vec/exec/scan/vscanner.h @@ -67,6 +67,7 @@ public: virtual Status open(RuntimeState* state) { return Status::OK(); } Status get_block(RuntimeState* state, Block* block, bool* eos); + Status get_block_after_projects(RuntimeState* state, vectorized::Block* block, bool* eos); virtual Status close(RuntimeState* state); @@ -89,6 +90,8 @@ protected: // Filter the output block finally. Status _filter_output_block(Block* block); + Status _do_projections(vectorized::Block* origin_block, vectorized::Block* output_block); + // Not virtual, all child will call this method explictly Status prepare(RuntimeState* state, const VExprContextSPtrs& conjuncts); @@ -172,6 +175,7 @@ protected: RuntimeProfile* _profile = nullptr; const TupleDescriptor* _output_tuple_desc = nullptr; + const RowDescriptor* _output_row_descriptor = nullptr; // If _input_tuple_desc is set, the scanner will read data into // this _input_block first, then convert to the output block. @@ -189,6 +193,8 @@ protected: // Cloned from _conjuncts of scan node. // It includes predicate in SQL and runtime filters. VExprContextSPtrs _conjuncts; + VExprContextSPtrs _projections; + vectorized::Block _origin_block; VExprContextSPtrs _common_expr_ctxs_push_down; // Late arriving runtime filters will update _conjuncts. --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org