This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 9d23ccf1f2a [Improvement](schema scan) Use async scanner for schema scanners (#38… (#38666) 9d23ccf1f2a is described below commit 9d23ccf1f2ae3fd6c3f3adc7af428c8d116f13db Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Thu Aug 1 16:05:24 2024 +0800 [Improvement](schema scan) Use async scanner for schema scanners (#38… (#38666) …403) --- be/src/exec/schema_scanner.cpp | 69 +++++++++++++++++++++- be/src/exec/schema_scanner.h | 24 +++++++- .../schema_active_queries_scanner.cpp | 2 +- .../schema_scanner/schema_active_queries_scanner.h | 2 +- .../schema_scanner/schema_backend_active_tasks.cpp | 3 +- .../schema_scanner/schema_backend_active_tasks.h | 2 +- .../schema_scanner/schema_charsets_scanner.cpp | 2 +- .../exec/schema_scanner/schema_charsets_scanner.h | 2 +- .../schema_scanner/schema_collations_scanner.cpp | 2 +- .../schema_scanner/schema_collations_scanner.h | 2 +- .../exec/schema_scanner/schema_columns_scanner.cpp | 2 +- .../exec/schema_scanner/schema_columns_scanner.h | 2 +- .../exec/schema_scanner/schema_dummy_scanner.cpp | 2 +- be/src/exec/schema_scanner/schema_dummy_scanner.h | 2 +- .../exec/schema_scanner/schema_files_scanner.cpp | 2 +- be/src/exec/schema_scanner/schema_files_scanner.h | 2 +- .../schema_metadata_name_ids_scanner.cpp | 2 +- .../schema_metadata_name_ids_scanner.h | 2 +- .../schema_scanner/schema_partitions_scanner.cpp | 2 +- .../schema_scanner/schema_partitions_scanner.h | 2 +- .../schema_scanner/schema_processlist_scanner.cpp | 2 +- .../schema_scanner/schema_processlist_scanner.h | 2 +- .../schema_scanner/schema_profiling_scanner.cpp | 2 +- .../exec/schema_scanner/schema_profiling_scanner.h | 2 +- .../exec/schema_scanner/schema_routine_scanner.cpp | 2 +- .../exec/schema_scanner/schema_routine_scanner.h | 2 +- .../exec/schema_scanner/schema_rowsets_scanner.cpp | 2 +- .../exec/schema_scanner/schema_rowsets_scanner.h | 2 +- .../schema_schema_privileges_scanner.cpp | 2 +- .../schema_schema_privileges_scanner.h | 2 +- .../schema_scanner/schema_schemata_scanner.cpp | 2 +- .../exec/schema_scanner/schema_schemata_scanner.h | 2 +- .../schema_table_privileges_scanner.cpp | 2 +- .../schema_table_privileges_scanner.h | 2 +- .../exec/schema_scanner/schema_tables_scanner.cpp | 2 +- be/src/exec/schema_scanner/schema_tables_scanner.h | 2 +- .../schema_user_privileges_scanner.cpp | 2 +- .../schema_user_privileges_scanner.h | 2 +- be/src/exec/schema_scanner/schema_user_scanner.cpp | 2 +- be/src/exec/schema_scanner/schema_user_scanner.h | 2 +- .../schema_scanner/schema_variables_scanner.cpp | 2 +- .../exec/schema_scanner/schema_variables_scanner.h | 2 +- .../exec/schema_scanner/schema_views_scanner.cpp | 2 +- be/src/exec/schema_scanner/schema_views_scanner.h | 2 +- .../schema_workload_groups_scanner.cpp | 2 +- .../schema_workload_groups_scanner.h | 2 +- .../schema_workload_sched_policy_scanner.cpp | 3 +- .../schema_workload_sched_policy_scanner.h | 2 +- be/src/pipeline/exec/schema_scan_operator.cpp | 9 ++- be/src/pipeline/exec/schema_scan_operator.h | 14 ++++- be/src/pipeline/pipeline_x/dependency.h | 16 +---- be/src/pipeline/pipeline_x/pipeline_x_task.cpp | 2 - be/src/vec/exec/vschema_scan_node.cpp | 2 +- 53 files changed, 162 insertions(+), 68 deletions(-) diff --git a/be/src/exec/schema_scanner.cpp b/be/src/exec/schema_scanner.cpp index 2b6b2c1f3c0..d9cafcf9049 100644 --- a/be/src/exec/schema_scanner.cpp +++ b/be/src/exec/schema_scanner.cpp @@ -50,7 +50,10 @@ #include "exec/schema_scanner/schema_workload_groups_scanner.h" #include "exec/schema_scanner/schema_workload_sched_policy_scanner.h" #include "olap/hll.h" +#include "pipeline/pipeline_x/dependency.h" #include "runtime/define_primitive_type.h" +#include "runtime/fragment_mgr.h" +#include "runtime/types.h" #include "util/string_util.h" #include "util/types.h" #include "vec/columns/column.h" @@ -64,6 +67,7 @@ #include "vec/core/column_with_type_and_name.h" #include "vec/core/types.h" #include "vec/data_types/data_type.h" +#include "vec/data_types/data_type_factory.hpp" namespace doris { class ObjectPool; @@ -84,7 +88,60 @@ Status SchemaScanner::start(RuntimeState* state) { return Status::OK(); } -Status SchemaScanner::get_next_block(vectorized::Block* block, bool* eos) { +Status SchemaScanner::get_next_block(RuntimeState* state, vectorized::Block* block, bool* eos) { + if (_data_block == nullptr) { + return Status::InternalError("No data left!"); + } + DCHECK(_async_thread_running == false); + RETURN_IF_ERROR(_scanner_status.status()); + for (size_t i = 0; i < block->columns(); i++) { + std::move(*block->get_by_position(i).column) + .mutate() + ->insert_range_from(*_data_block->get_by_position(i).column, 0, + _data_block->rows()); + } + _data_block->clear_column_data(); + *eos = _eos; + if (!*eos) { + RETURN_IF_ERROR(get_next_block_async(state)); + } + return Status::OK(); +} + +Status SchemaScanner::get_next_block_async(RuntimeState* state) { + _dependency->block(); + auto task_ctx = state->get_task_execution_context(); + RETURN_IF_ERROR(ExecEnv::GetInstance()->fragment_mgr()->get_thread_pool()->submit_func( + [this, task_ctx, state]() { + DCHECK(_async_thread_running == false); + auto task_lock = task_ctx.lock(); + if (task_lock == nullptr) { + _scanner_status.update(Status::InternalError("Task context not exists!")); + return; + } + SCOPED_ATTACH_TASK(state); + _dependency->block(); + _async_thread_running = true; + _finish_dependency->block(); + if (!_opened) { + _data_block = vectorized::Block::create_unique(); + _init_block(_data_block.get()); + _scanner_status.update(start(state)); + _opened = true; + } + bool eos = false; + _scanner_status.update(get_next_block_internal(_data_block.get(), &eos)); + _eos = eos; + _async_thread_running = false; + _dependency->set_ready(); + if (eos) { + _finish_dependency->set_ready(); + } + })); + return Status::OK(); +} + +Status SchemaScanner::get_next_block_internal(vectorized::Block* block, bool* eos) { if (!_is_init) { return Status::InternalError("used before initialized."); } @@ -173,6 +230,16 @@ std::unique_ptr<SchemaScanner> SchemaScanner::create(TSchemaTableType::type type } } +void SchemaScanner::_init_block(vectorized::Block* src_block) { + const std::vector<SchemaScanner::ColumnDesc>& columns_desc(get_column_desc()); + for (int i = 0; i < columns_desc.size(); ++i) { + TypeDescriptor descriptor(columns_desc[i].type); + auto data_type = vectorized::DataTypeFactory::instance().create_data_type(descriptor, true); + src_block->insert(vectorized::ColumnWithTypeAndName(data_type->create_column(), data_type, + columns_desc[i].name)); + } +} + Status SchemaScanner::fill_dest_column_for_range(vectorized::Block* block, size_t pos, const std::vector<void*>& datas) { const ColumnDesc& col_desc = _columns[pos]; diff --git a/be/src/exec/schema_scanner.h b/be/src/exec/schema_scanner.h index a23706ac6a4..4666657af21 100644 --- a/be/src/exec/schema_scanner.h +++ b/be/src/exec/schema_scanner.h @@ -22,6 +22,7 @@ #include <stddef.h> #include <stdint.h> +#include <condition_variable> #include <memory> #include <string> #include <vector> @@ -43,6 +44,10 @@ namespace vectorized { class Block; } +namespace pipeline { +class Dependency; +} + struct SchemaScannerCommonParam { SchemaScannerCommonParam() : db(nullptr), @@ -94,15 +99,23 @@ public: // init object need information, schema etc. virtual Status init(SchemaScannerParam* param, ObjectPool* pool); + Status get_next_block(RuntimeState* state, vectorized::Block* block, bool* eos); // Start to work virtual Status start(RuntimeState* state); - virtual Status get_next_block(vectorized::Block* block, bool* eos); + virtual Status get_next_block_internal(vectorized::Block* block, bool* eos); const std::vector<ColumnDesc>& get_column_desc() const { return _columns; } // factory function static std::unique_ptr<SchemaScanner> create(TSchemaTableType::type type); TSchemaTableType::type type() const { return _schema_table_type; } + void set_dependency(std::shared_ptr<pipeline::Dependency> dep, + std::shared_ptr<pipeline::Dependency> fin_dep) { + _dependency = dep; + _finish_dependency = fin_dep; + } + Status get_next_block_async(RuntimeState* state); protected: + void _init_block(vectorized::Block* src_block); Status fill_dest_column_for_range(vectorized::Block* block, size_t pos, const std::vector<void*>& datas); @@ -125,6 +138,15 @@ protected: RuntimeProfile::Counter* _get_table_timer = nullptr; RuntimeProfile::Counter* _get_describe_timer = nullptr; RuntimeProfile::Counter* _fill_block_timer = nullptr; + + std::shared_ptr<pipeline::Dependency> _dependency = nullptr; + std::shared_ptr<pipeline::Dependency> _finish_dependency = nullptr; + + std::unique_ptr<vectorized::Block> _data_block; + AtomicStatus _scanner_status; + std::atomic<bool> _eos = false; + std::atomic<bool> _opened = false; + std::atomic<bool> _async_thread_running = false; }; } // namespace doris diff --git a/be/src/exec/schema_scanner/schema_active_queries_scanner.cpp b/be/src/exec/schema_scanner/schema_active_queries_scanner.cpp index 2115a38a6eb..46522a36242 100644 --- a/be/src/exec/schema_scanner/schema_active_queries_scanner.cpp +++ b/be/src/exec/schema_scanner/schema_active_queries_scanner.cpp @@ -137,7 +137,7 @@ Status SchemaActiveQueriesScanner::_get_active_queries_block_from_fe() { return Status::OK(); } -Status SchemaActiveQueriesScanner::get_next_block(vectorized::Block* block, bool* eos) { +Status SchemaActiveQueriesScanner::get_next_block_internal(vectorized::Block* block, bool* eos) { if (!_is_init) { return Status::InternalError("Used before initialized."); } diff --git a/be/src/exec/schema_scanner/schema_active_queries_scanner.h b/be/src/exec/schema_scanner/schema_active_queries_scanner.h index 1df5b1f9d74..7e9ae4b8034 100644 --- a/be/src/exec/schema_scanner/schema_active_queries_scanner.h +++ b/be/src/exec/schema_scanner/schema_active_queries_scanner.h @@ -36,7 +36,7 @@ public: ~SchemaActiveQueriesScanner() override; Status start(RuntimeState* state) override; - Status get_next_block(vectorized::Block* block, bool* eos) override; + Status get_next_block_internal(vectorized::Block* block, bool* eos) override; static std::vector<SchemaScanner::ColumnDesc> _s_tbls_columns; diff --git a/be/src/exec/schema_scanner/schema_backend_active_tasks.cpp b/be/src/exec/schema_scanner/schema_backend_active_tasks.cpp index f1155796ed4..b35e84a9f9c 100644 --- a/be/src/exec/schema_scanner/schema_backend_active_tasks.cpp +++ b/be/src/exec/schema_scanner/schema_backend_active_tasks.cpp @@ -51,7 +51,8 @@ Status SchemaBackendActiveTasksScanner::start(RuntimeState* state) { return Status::OK(); } -Status SchemaBackendActiveTasksScanner::get_next_block(vectorized::Block* block, bool* eos) { +Status SchemaBackendActiveTasksScanner::get_next_block_internal(vectorized::Block* block, + bool* eos) { if (!_is_init) { return Status::InternalError("Used before initialized."); } diff --git a/be/src/exec/schema_scanner/schema_backend_active_tasks.h b/be/src/exec/schema_scanner/schema_backend_active_tasks.h index d8a2a1ffa3f..43819818b57 100644 --- a/be/src/exec/schema_scanner/schema_backend_active_tasks.h +++ b/be/src/exec/schema_scanner/schema_backend_active_tasks.h @@ -36,7 +36,7 @@ public: ~SchemaBackendActiveTasksScanner() override; Status start(RuntimeState* state) override; - Status get_next_block(vectorized::Block* block, bool* eos) override; + Status get_next_block_internal(vectorized::Block* block, bool* eos) override; static std::vector<SchemaScanner::ColumnDesc> _s_tbls_columns; diff --git a/be/src/exec/schema_scanner/schema_charsets_scanner.cpp b/be/src/exec/schema_scanner/schema_charsets_scanner.cpp index 9bd7ad7919c..ff42e7f5a05 100644 --- a/be/src/exec/schema_scanner/schema_charsets_scanner.cpp +++ b/be/src/exec/schema_scanner/schema_charsets_scanner.cpp @@ -48,7 +48,7 @@ SchemaCharsetsScanner::SchemaCharsetsScanner() SchemaCharsetsScanner::~SchemaCharsetsScanner() {} -Status SchemaCharsetsScanner::get_next_block(vectorized::Block* block, bool* eos) { +Status SchemaCharsetsScanner::get_next_block_internal(vectorized::Block* block, bool* eos) { if (!_is_init) { return Status::InternalError("call this before initial."); } diff --git a/be/src/exec/schema_scanner/schema_charsets_scanner.h b/be/src/exec/schema_scanner/schema_charsets_scanner.h index 1f01070875c..d5089c62826 100644 --- a/be/src/exec/schema_scanner/schema_charsets_scanner.h +++ b/be/src/exec/schema_scanner/schema_charsets_scanner.h @@ -36,7 +36,7 @@ public: SchemaCharsetsScanner(); ~SchemaCharsetsScanner() override; - Status get_next_block(vectorized::Block* block, bool* eos) override; + Status get_next_block_internal(vectorized::Block* block, bool* eos) override; private: struct CharsetStruct { diff --git a/be/src/exec/schema_scanner/schema_collations_scanner.cpp b/be/src/exec/schema_scanner/schema_collations_scanner.cpp index 812a8cff18e..271c9a6fb78 100644 --- a/be/src/exec/schema_scanner/schema_collations_scanner.cpp +++ b/be/src/exec/schema_scanner/schema_collations_scanner.cpp @@ -50,7 +50,7 @@ SchemaCollationsScanner::SchemaCollationsScanner() SchemaCollationsScanner::~SchemaCollationsScanner() {} -Status SchemaCollationsScanner::get_next_block(vectorized::Block* block, bool* eos) { +Status SchemaCollationsScanner::get_next_block_internal(vectorized::Block* block, bool* eos) { if (!_is_init) { return Status::InternalError("call this before initial."); } diff --git a/be/src/exec/schema_scanner/schema_collations_scanner.h b/be/src/exec/schema_scanner/schema_collations_scanner.h index f0f60538cac..2fe200da78d 100644 --- a/be/src/exec/schema_scanner/schema_collations_scanner.h +++ b/be/src/exec/schema_scanner/schema_collations_scanner.h @@ -36,7 +36,7 @@ public: SchemaCollationsScanner(); ~SchemaCollationsScanner() override; - Status get_next_block(vectorized::Block* block, bool* eos) override; + Status get_next_block_internal(vectorized::Block* block, bool* eos) override; private: struct CollationStruct { diff --git a/be/src/exec/schema_scanner/schema_columns_scanner.cpp b/be/src/exec/schema_scanner/schema_columns_scanner.cpp index 763f24b9e53..440370c36c9 100644 --- a/be/src/exec/schema_scanner/schema_columns_scanner.cpp +++ b/be/src/exec/schema_scanner/schema_columns_scanner.cpp @@ -347,7 +347,7 @@ Status SchemaColumnsScanner::_get_new_table() { return Status::OK(); } -Status SchemaColumnsScanner::get_next_block(vectorized::Block* block, bool* eos) { +Status SchemaColumnsScanner::get_next_block_internal(vectorized::Block* block, bool* eos) { if (!_is_init) { return Status::InternalError("use this class before inited."); } diff --git a/be/src/exec/schema_scanner/schema_columns_scanner.h b/be/src/exec/schema_scanner/schema_columns_scanner.h index 2499db7ed82..99150c36d10 100644 --- a/be/src/exec/schema_scanner/schema_columns_scanner.h +++ b/be/src/exec/schema_scanner/schema_columns_scanner.h @@ -38,7 +38,7 @@ public: SchemaColumnsScanner(); ~SchemaColumnsScanner() override; Status start(RuntimeState* state) override; - Status get_next_block(vectorized::Block* block, bool* eos) override; + Status get_next_block_internal(vectorized::Block* block, bool* eos) override; private: Status _get_new_table(); diff --git a/be/src/exec/schema_scanner/schema_dummy_scanner.cpp b/be/src/exec/schema_scanner/schema_dummy_scanner.cpp index 1d5956f390e..9e3a703d9fb 100644 --- a/be/src/exec/schema_scanner/schema_dummy_scanner.cpp +++ b/be/src/exec/schema_scanner/schema_dummy_scanner.cpp @@ -40,7 +40,7 @@ Status SchemaDummyScanner::start(RuntimeState* state) { return Status::OK(); } -Status SchemaDummyScanner::get_next_block(vectorized::Block* block, bool* eos) { +Status SchemaDummyScanner::get_next_block_internal(vectorized::Block* block, bool* eos) { *eos = true; return Status::OK(); } diff --git a/be/src/exec/schema_scanner/schema_dummy_scanner.h b/be/src/exec/schema_scanner/schema_dummy_scanner.h index a67f6fa25c1..0c5e4aabe35 100644 --- a/be/src/exec/schema_scanner/schema_dummy_scanner.h +++ b/be/src/exec/schema_scanner/schema_dummy_scanner.h @@ -33,7 +33,7 @@ public: SchemaDummyScanner(); ~SchemaDummyScanner() override; Status start(RuntimeState* state = nullptr) override; - Status get_next_block(vectorized::Block* block, bool* eos) override; + Status get_next_block_internal(vectorized::Block* block, bool* eos) override; }; } // namespace doris diff --git a/be/src/exec/schema_scanner/schema_files_scanner.cpp b/be/src/exec/schema_scanner/schema_files_scanner.cpp index 55b7a338c31..20aa07fa691 100644 --- a/be/src/exec/schema_scanner/schema_files_scanner.cpp +++ b/be/src/exec/schema_scanner/schema_files_scanner.cpp @@ -113,7 +113,7 @@ Status SchemaFilesScanner::start(RuntimeState* state) { return Status::OK(); } -Status SchemaFilesScanner::get_next_block(vectorized::Block* block, bool* eos) { +Status SchemaFilesScanner::get_next_block_internal(vectorized::Block* block, bool* eos) { if (!_is_init) { return Status::InternalError("Used before initialized."); } diff --git a/be/src/exec/schema_scanner/schema_files_scanner.h b/be/src/exec/schema_scanner/schema_files_scanner.h index 6805a04be4a..bb3b2d68493 100644 --- a/be/src/exec/schema_scanner/schema_files_scanner.h +++ b/be/src/exec/schema_scanner/schema_files_scanner.h @@ -38,7 +38,7 @@ public: ~SchemaFilesScanner() override; Status start(RuntimeState* state) override; - Status get_next_block(vectorized::Block* block, bool* eos) override; + Status get_next_block_internal(vectorized::Block* block, bool* eos) override; int _db_index; int _table_index; diff --git a/be/src/exec/schema_scanner/schema_metadata_name_ids_scanner.cpp b/be/src/exec/schema_scanner/schema_metadata_name_ids_scanner.cpp index ef7b2b69c1e..aacd9431524 100644 --- a/be/src/exec/schema_scanner/schema_metadata_name_ids_scanner.cpp +++ b/be/src/exec/schema_scanner/schema_metadata_name_ids_scanner.cpp @@ -225,7 +225,7 @@ Status SchemaMetadataNameIdsScanner::_fill_block_impl(vectorized::Block* block) return Status::OK(); } -Status SchemaMetadataNameIdsScanner::get_next_block(vectorized::Block* block, bool* eos) { +Status SchemaMetadataNameIdsScanner::get_next_block_internal(vectorized::Block* block, bool* eos) { if (!_is_init) { return Status::InternalError("Used before initialized."); } diff --git a/be/src/exec/schema_scanner/schema_metadata_name_ids_scanner.h b/be/src/exec/schema_scanner/schema_metadata_name_ids_scanner.h index 9981d441d85..c3beea77697 100644 --- a/be/src/exec/schema_scanner/schema_metadata_name_ids_scanner.h +++ b/be/src/exec/schema_scanner/schema_metadata_name_ids_scanner.h @@ -39,7 +39,7 @@ public: ~SchemaMetadataNameIdsScanner() override; Status start(RuntimeState* state) override; - Status get_next_block(vectorized::Block* block, bool* eos) override; + Status get_next_block_internal(vectorized::Block* block, bool* eos) override; private: Status _get_new_table(); diff --git a/be/src/exec/schema_scanner/schema_partitions_scanner.cpp b/be/src/exec/schema_scanner/schema_partitions_scanner.cpp index f1ad1f594f8..ea7394e15e1 100644 --- a/be/src/exec/schema_scanner/schema_partitions_scanner.cpp +++ b/be/src/exec/schema_scanner/schema_partitions_scanner.cpp @@ -101,7 +101,7 @@ Status SchemaPartitionsScanner::start(RuntimeState* state) { return Status::OK(); } -Status SchemaPartitionsScanner::get_next_block(vectorized::Block* block, bool* eos) { +Status SchemaPartitionsScanner::get_next_block_internal(vectorized::Block* block, bool* eos) { if (!_is_init) { return Status::InternalError("Used before initialized."); } diff --git a/be/src/exec/schema_scanner/schema_partitions_scanner.h b/be/src/exec/schema_scanner/schema_partitions_scanner.h index 47e1d1fcf87..87e55db984a 100644 --- a/be/src/exec/schema_scanner/schema_partitions_scanner.h +++ b/be/src/exec/schema_scanner/schema_partitions_scanner.h @@ -38,7 +38,7 @@ public: ~SchemaPartitionsScanner() override; Status start(RuntimeState* state) override; - Status get_next_block(vectorized::Block* block, bool* eos) override; + Status get_next_block_internal(vectorized::Block* block, bool* eos) override; int _db_index; int _table_index; diff --git a/be/src/exec/schema_scanner/schema_processlist_scanner.cpp b/be/src/exec/schema_scanner/schema_processlist_scanner.cpp index f5f5bc23634..f1071359d0a 100644 --- a/be/src/exec/schema_scanner/schema_processlist_scanner.cpp +++ b/be/src/exec/schema_scanner/schema_processlist_scanner.cpp @@ -62,7 +62,7 @@ Status SchemaProcessListScanner::start(RuntimeState* state) { return Status::OK(); } -Status SchemaProcessListScanner::get_next_block(vectorized::Block* block, bool* eos) { +Status SchemaProcessListScanner::get_next_block_internal(vectorized::Block* block, bool* eos) { if (!_is_init) { return Status::InternalError("call this before initial."); } diff --git a/be/src/exec/schema_scanner/schema_processlist_scanner.h b/be/src/exec/schema_scanner/schema_processlist_scanner.h index 8aae87e1ef6..c0b0a47f615 100644 --- a/be/src/exec/schema_scanner/schema_processlist_scanner.h +++ b/be/src/exec/schema_scanner/schema_processlist_scanner.h @@ -40,7 +40,7 @@ public: ~SchemaProcessListScanner() override; Status start(RuntimeState* state) override; - Status get_next_block(vectorized::Block* block, bool* eos) override; + Status get_next_block_internal(vectorized::Block* block, bool* eos) override; static std::vector<SchemaScanner::ColumnDesc> _s_processlist_columns; diff --git a/be/src/exec/schema_scanner/schema_profiling_scanner.cpp b/be/src/exec/schema_scanner/schema_profiling_scanner.cpp index 2f71eb96f26..0a2a64330bb 100644 --- a/be/src/exec/schema_scanner/schema_profiling_scanner.cpp +++ b/be/src/exec/schema_scanner/schema_profiling_scanner.cpp @@ -88,7 +88,7 @@ Status SchemaProfilingScanner::start(RuntimeState* state) { return Status::OK(); } -Status SchemaProfilingScanner::get_next_block(vectorized::Block* block, bool* eos) { +Status SchemaProfilingScanner::get_next_block_internal(vectorized::Block* block, bool* eos) { if (!_is_init) { return Status::InternalError("Used before initialized."); } diff --git a/be/src/exec/schema_scanner/schema_profiling_scanner.h b/be/src/exec/schema_scanner/schema_profiling_scanner.h index 5399cb14eb4..6b969a478ac 100644 --- a/be/src/exec/schema_scanner/schema_profiling_scanner.h +++ b/be/src/exec/schema_scanner/schema_profiling_scanner.h @@ -38,7 +38,7 @@ public: ~SchemaProfilingScanner() override; Status start(RuntimeState* state) override; - Status get_next_block(vectorized::Block* block, bool* eos) override; + Status get_next_block_internal(vectorized::Block* block, bool* eos) override; static std::vector<SchemaScanner::ColumnDesc> _s_tbls_columns; }; diff --git a/be/src/exec/schema_scanner/schema_routine_scanner.cpp b/be/src/exec/schema_scanner/schema_routine_scanner.cpp index 3d55addee6c..8c263c99d2d 100644 --- a/be/src/exec/schema_scanner/schema_routine_scanner.cpp +++ b/be/src/exec/schema_scanner/schema_routine_scanner.cpp @@ -141,7 +141,7 @@ Status SchemaRoutinesScanner::get_block_from_fe() { return Status::OK(); } -Status SchemaRoutinesScanner::get_next_block(vectorized::Block* block, bool* eos) { +Status SchemaRoutinesScanner::get_next_block_internal(vectorized::Block* block, bool* eos) { if (!_is_init) { return Status::InternalError("Used before initialized."); } diff --git a/be/src/exec/schema_scanner/schema_routine_scanner.h b/be/src/exec/schema_scanner/schema_routine_scanner.h index 543f9e8e8f6..c60d72340e1 100644 --- a/be/src/exec/schema_scanner/schema_routine_scanner.h +++ b/be/src/exec/schema_scanner/schema_routine_scanner.h @@ -36,7 +36,7 @@ public: ~SchemaRoutinesScanner() override = default; Status start(RuntimeState* state) override; - Status get_next_block(vectorized::Block* block, bool* eos) override; + Status get_next_block_internal(vectorized::Block* block, bool* eos) override; static std::vector<SchemaScanner::ColumnDesc> _s_tbls_columns; diff --git a/be/src/exec/schema_scanner/schema_rowsets_scanner.cpp b/be/src/exec/schema_scanner/schema_rowsets_scanner.cpp index b760d8bde04..8bfd785ce5d 100644 --- a/be/src/exec/schema_scanner/schema_rowsets_scanner.cpp +++ b/be/src/exec/schema_scanner/schema_rowsets_scanner.cpp @@ -92,7 +92,7 @@ Status SchemaRowsetsScanner::_get_all_rowsets() { return Status::OK(); } -Status SchemaRowsetsScanner::get_next_block(vectorized::Block* block, bool* eos) { +Status SchemaRowsetsScanner::get_next_block_internal(vectorized::Block* block, bool* eos) { if (!_is_init) { return Status::InternalError("Used before initialized."); } diff --git a/be/src/exec/schema_scanner/schema_rowsets_scanner.h b/be/src/exec/schema_scanner/schema_rowsets_scanner.h index b975cc4231b..cad34fc0494 100644 --- a/be/src/exec/schema_scanner/schema_rowsets_scanner.h +++ b/be/src/exec/schema_scanner/schema_rowsets_scanner.h @@ -40,7 +40,7 @@ public: ~SchemaRowsetsScanner() override = default; Status start(RuntimeState* state) override; - Status get_next_block(vectorized::Block* block, bool* eos) override; + Status get_next_block_internal(vectorized::Block* block, bool* eos) override; private: Status _get_all_rowsets(); diff --git a/be/src/exec/schema_scanner/schema_schema_privileges_scanner.cpp b/be/src/exec/schema_scanner/schema_schema_privileges_scanner.cpp index 09c470ff50a..c24b1fbb071 100644 --- a/be/src/exec/schema_scanner/schema_schema_privileges_scanner.cpp +++ b/be/src/exec/schema_scanner/schema_schema_privileges_scanner.cpp @@ -82,7 +82,7 @@ Status SchemaSchemaPrivilegesScanner::_get_new_table() { return Status::OK(); } -Status SchemaSchemaPrivilegesScanner::get_next_block(vectorized::Block* block, bool* eos) { +Status SchemaSchemaPrivilegesScanner::get_next_block_internal(vectorized::Block* block, bool* eos) { if (!_is_init) { return Status::InternalError("Used before initialized."); } diff --git a/be/src/exec/schema_scanner/schema_schema_privileges_scanner.h b/be/src/exec/schema_scanner/schema_schema_privileges_scanner.h index af2ad49634b..9522fba908b 100644 --- a/be/src/exec/schema_scanner/schema_schema_privileges_scanner.h +++ b/be/src/exec/schema_scanner/schema_schema_privileges_scanner.h @@ -38,7 +38,7 @@ public: ~SchemaSchemaPrivilegesScanner() override; Status start(RuntimeState* state) override; - Status get_next_block(vectorized::Block* block, bool* eos) override; + Status get_next_block_internal(vectorized::Block* block, bool* eos) override; private: Status _get_new_table(); diff --git a/be/src/exec/schema_scanner/schema_schemata_scanner.cpp b/be/src/exec/schema_scanner/schema_schemata_scanner.cpp index e09817ca310..e70b3b7a32e 100644 --- a/be/src/exec/schema_scanner/schema_schemata_scanner.cpp +++ b/be/src/exec/schema_scanner/schema_schemata_scanner.cpp @@ -81,7 +81,7 @@ Status SchemaSchemataScanner::start(RuntimeState* state) { return Status::OK(); } -Status SchemaSchemataScanner::get_next_block(vectorized::Block* block, bool* eos) { +Status SchemaSchemataScanner::get_next_block_internal(vectorized::Block* block, bool* eos) { if (!_is_init) { return Status::InternalError("Used before Initialized."); } diff --git a/be/src/exec/schema_scanner/schema_schemata_scanner.h b/be/src/exec/schema_scanner/schema_schemata_scanner.h index 46fad31af1f..39a5ddda495 100644 --- a/be/src/exec/schema_scanner/schema_schemata_scanner.h +++ b/be/src/exec/schema_scanner/schema_schemata_scanner.h @@ -38,7 +38,7 @@ public: ~SchemaSchemataScanner() override; Status start(RuntimeState* state) override; - Status get_next_block(vectorized::Block* block, bool* eos) override; + Status get_next_block_internal(vectorized::Block* block, bool* eos) override; private: Status _fill_block_impl(vectorized::Block* block); diff --git a/be/src/exec/schema_scanner/schema_table_privileges_scanner.cpp b/be/src/exec/schema_scanner/schema_table_privileges_scanner.cpp index 41a3faf7c5a..4d2e6246656 100644 --- a/be/src/exec/schema_scanner/schema_table_privileges_scanner.cpp +++ b/be/src/exec/schema_scanner/schema_table_privileges_scanner.cpp @@ -84,7 +84,7 @@ Status SchemaTablePrivilegesScanner::_get_new_table() { return Status::OK(); } -Status SchemaTablePrivilegesScanner::get_next_block(vectorized::Block* block, bool* eos) { +Status SchemaTablePrivilegesScanner::get_next_block_internal(vectorized::Block* block, bool* eos) { if (!_is_init) { return Status::InternalError("Used before initialized."); } diff --git a/be/src/exec/schema_scanner/schema_table_privileges_scanner.h b/be/src/exec/schema_scanner/schema_table_privileges_scanner.h index aa79c88304b..4cfcc16d358 100644 --- a/be/src/exec/schema_scanner/schema_table_privileges_scanner.h +++ b/be/src/exec/schema_scanner/schema_table_privileges_scanner.h @@ -38,7 +38,7 @@ public: ~SchemaTablePrivilegesScanner() override; Status start(RuntimeState* state) override; - Status get_next_block(vectorized::Block* block, bool* eos) override; + Status get_next_block_internal(vectorized::Block* block, bool* eos) override; private: Status _get_new_table(); diff --git a/be/src/exec/schema_scanner/schema_tables_scanner.cpp b/be/src/exec/schema_scanner/schema_tables_scanner.cpp index 375ceb2c470..6aaafe1ae57 100644 --- a/be/src/exec/schema_scanner/schema_tables_scanner.cpp +++ b/be/src/exec/schema_scanner/schema_tables_scanner.cpp @@ -342,7 +342,7 @@ Status SchemaTablesScanner::_fill_block_impl(vectorized::Block* block) { return Status::OK(); } -Status SchemaTablesScanner::get_next_block(vectorized::Block* block, bool* eos) { +Status SchemaTablesScanner::get_next_block_internal(vectorized::Block* block, bool* eos) { if (!_is_init) { return Status::InternalError("Used before initialized."); } diff --git a/be/src/exec/schema_scanner/schema_tables_scanner.h b/be/src/exec/schema_scanner/schema_tables_scanner.h index 11a96bf65d5..7f8eb11f397 100644 --- a/be/src/exec/schema_scanner/schema_tables_scanner.h +++ b/be/src/exec/schema_scanner/schema_tables_scanner.h @@ -39,7 +39,7 @@ public: ~SchemaTablesScanner() override; Status start(RuntimeState* state) override; - Status get_next_block(vectorized::Block* block, bool* eos) override; + Status get_next_block_internal(vectorized::Block* block, bool* eos) override; private: Status _get_new_table(); diff --git a/be/src/exec/schema_scanner/schema_user_privileges_scanner.cpp b/be/src/exec/schema_scanner/schema_user_privileges_scanner.cpp index b636ff65fd7..f9f4b272aaa 100644 --- a/be/src/exec/schema_scanner/schema_user_privileges_scanner.cpp +++ b/be/src/exec/schema_scanner/schema_user_privileges_scanner.cpp @@ -81,7 +81,7 @@ Status SchemaUserPrivilegesScanner::_get_new_table() { return Status::OK(); } -Status SchemaUserPrivilegesScanner::get_next_block(vectorized::Block* block, bool* eos) { +Status SchemaUserPrivilegesScanner::get_next_block_internal(vectorized::Block* block, bool* eos) { if (!_is_init) { return Status::InternalError("Used before initialized."); } diff --git a/be/src/exec/schema_scanner/schema_user_privileges_scanner.h b/be/src/exec/schema_scanner/schema_user_privileges_scanner.h index eb8f3c63f14..ffc3840db67 100644 --- a/be/src/exec/schema_scanner/schema_user_privileges_scanner.h +++ b/be/src/exec/schema_scanner/schema_user_privileges_scanner.h @@ -38,7 +38,7 @@ public: ~SchemaUserPrivilegesScanner() override; Status start(RuntimeState* state) override; - Status get_next_block(vectorized::Block* block, bool* eos) override; + Status get_next_block_internal(vectorized::Block* block, bool* eos) override; private: Status _get_new_table(); diff --git a/be/src/exec/schema_scanner/schema_user_scanner.cpp b/be/src/exec/schema_scanner/schema_user_scanner.cpp index 9b153414380..e56f18f05ae 100644 --- a/be/src/exec/schema_scanner/schema_user_scanner.cpp +++ b/be/src/exec/schema_scanner/schema_user_scanner.cpp @@ -76,7 +76,7 @@ Status SchemaUserScanner::start(RuntimeState* state) { return Status::OK(); } -Status SchemaUserScanner::get_next_block(vectorized::Block* block, bool* eos) { +Status SchemaUserScanner::get_next_block_internal(vectorized::Block* block, bool* eos) { if (!_is_init) { return Status::InternalError("call this before initial."); } diff --git a/be/src/exec/schema_scanner/schema_user_scanner.h b/be/src/exec/schema_scanner/schema_user_scanner.h index c55f216804d..bdc618eb5a0 100644 --- a/be/src/exec/schema_scanner/schema_user_scanner.h +++ b/be/src/exec/schema_scanner/schema_user_scanner.h @@ -40,7 +40,7 @@ public: ~SchemaUserScanner() override; Status start(RuntimeState* state) override; - Status get_next_block(vectorized::Block* block, bool* eos) override; + Status get_next_block_internal(vectorized::Block* block, bool* eos) override; static std::vector<SchemaScanner::ColumnDesc> _s_user_columns; diff --git a/be/src/exec/schema_scanner/schema_variables_scanner.cpp b/be/src/exec/schema_scanner/schema_variables_scanner.cpp index 491a11f2572..445089b36ab 100644 --- a/be/src/exec/schema_scanner/schema_variables_scanner.cpp +++ b/be/src/exec/schema_scanner/schema_variables_scanner.cpp @@ -70,7 +70,7 @@ Status SchemaVariablesScanner::start(RuntimeState* state) { return Status::OK(); } -Status SchemaVariablesScanner::get_next_block(vectorized::Block* block, bool* eos) { +Status SchemaVariablesScanner::get_next_block_internal(vectorized::Block* block, bool* eos) { if (!_is_init) { return Status::InternalError("call this before initial."); } diff --git a/be/src/exec/schema_scanner/schema_variables_scanner.h b/be/src/exec/schema_scanner/schema_variables_scanner.h index 2d207ff8b2e..31bbacf713b 100644 --- a/be/src/exec/schema_scanner/schema_variables_scanner.h +++ b/be/src/exec/schema_scanner/schema_variables_scanner.h @@ -40,7 +40,7 @@ public: ~SchemaVariablesScanner() override; Status start(RuntimeState* state) override; - Status get_next_block(vectorized::Block* block, bool* eos) override; + Status get_next_block_internal(vectorized::Block* block, bool* eos) override; private: struct VariableStruct { diff --git a/be/src/exec/schema_scanner/schema_views_scanner.cpp b/be/src/exec/schema_scanner/schema_views_scanner.cpp index 7d9ce671a66..5b5f0c1b729 100644 --- a/be/src/exec/schema_scanner/schema_views_scanner.cpp +++ b/be/src/exec/schema_scanner/schema_views_scanner.cpp @@ -113,7 +113,7 @@ Status SchemaViewsScanner::_get_new_table() { return Status::OK(); } -Status SchemaViewsScanner::get_next_block(vectorized::Block* block, bool* eos) { +Status SchemaViewsScanner::get_next_block_internal(vectorized::Block* block, bool* eos) { if (!_is_init) { return Status::InternalError("Used before initialized."); } diff --git a/be/src/exec/schema_scanner/schema_views_scanner.h b/be/src/exec/schema_scanner/schema_views_scanner.h index bc473057905..b86ad922e5e 100644 --- a/be/src/exec/schema_scanner/schema_views_scanner.h +++ b/be/src/exec/schema_scanner/schema_views_scanner.h @@ -38,7 +38,7 @@ public: ~SchemaViewsScanner() override; Status start(RuntimeState* state) override; - Status get_next_block(vectorized::Block* block, bool* eos) override; + Status get_next_block_internal(vectorized::Block* block, bool* eos) override; private: Status _get_new_table(); diff --git a/be/src/exec/schema_scanner/schema_workload_groups_scanner.cpp b/be/src/exec/schema_scanner/schema_workload_groups_scanner.cpp index def52df531d..8b0c6be536b 100644 --- a/be/src/exec/schema_scanner/schema_workload_groups_scanner.cpp +++ b/be/src/exec/schema_scanner/schema_workload_groups_scanner.cpp @@ -114,7 +114,7 @@ Status SchemaWorkloadGroupsScanner::_get_workload_groups_block_from_fe() { return Status::OK(); } -Status SchemaWorkloadGroupsScanner::get_next_block(vectorized::Block* block, bool* eos) { +Status SchemaWorkloadGroupsScanner::get_next_block_internal(vectorized::Block* block, bool* eos) { if (!_is_init) { return Status::InternalError("Used before initialized."); } diff --git a/be/src/exec/schema_scanner/schema_workload_groups_scanner.h b/be/src/exec/schema_scanner/schema_workload_groups_scanner.h index bf7a103526d..3121c4dbac1 100644 --- a/be/src/exec/schema_scanner/schema_workload_groups_scanner.h +++ b/be/src/exec/schema_scanner/schema_workload_groups_scanner.h @@ -36,7 +36,7 @@ public: ~SchemaWorkloadGroupsScanner() override; Status start(RuntimeState* state) override; - Status get_next_block(vectorized::Block* block, bool* eos) override; + Status get_next_block_internal(vectorized::Block* block, bool* eos) override; static std::vector<SchemaScanner::ColumnDesc> _s_tbls_columns; diff --git a/be/src/exec/schema_scanner/schema_workload_sched_policy_scanner.cpp b/be/src/exec/schema_scanner/schema_workload_sched_policy_scanner.cpp index 035d3bfe217..2d91f151f5f 100644 --- a/be/src/exec/schema_scanner/schema_workload_sched_policy_scanner.cpp +++ b/be/src/exec/schema_scanner/schema_workload_sched_policy_scanner.cpp @@ -106,7 +106,8 @@ Status SchemaWorkloadSchedulePolicyScanner::_get_workload_schedule_policy_block_ return Status::OK(); } -Status SchemaWorkloadSchedulePolicyScanner::get_next_block(vectorized::Block* block, bool* eos) { +Status SchemaWorkloadSchedulePolicyScanner::get_next_block_internal(vectorized::Block* block, + bool* eos) { if (!_is_init) { return Status::InternalError("Used before initialized."); } diff --git a/be/src/exec/schema_scanner/schema_workload_sched_policy_scanner.h b/be/src/exec/schema_scanner/schema_workload_sched_policy_scanner.h index 5284975fe66..da8d9f15c49 100644 --- a/be/src/exec/schema_scanner/schema_workload_sched_policy_scanner.h +++ b/be/src/exec/schema_scanner/schema_workload_sched_policy_scanner.h @@ -36,7 +36,7 @@ public: ~SchemaWorkloadSchedulePolicyScanner() override; Status start(RuntimeState* state) override; - Status get_next_block(vectorized::Block* block, bool* eos) override; + Status get_next_block_internal(vectorized::Block* block, bool* eos) override; static std::vector<SchemaScanner::ColumnDesc> _s_tbls_columns; diff --git a/be/src/pipeline/exec/schema_scan_operator.cpp b/be/src/pipeline/exec/schema_scan_operator.cpp index f26b2d706b7..8ff05cc41b7 100644 --- a/be/src/pipeline/exec/schema_scan_operator.cpp +++ b/be/src/pipeline/exec/schema_scan_operator.cpp @@ -61,6 +61,7 @@ Status SchemaScanLocalState::init(RuntimeState* state, LocalStateInfo& info) { // new one scanner _schema_scanner = SchemaScanner::create(schema_table->schema_table_type()); + _schema_scanner->set_dependency(_data_dependency, _finish_dependency); if (nullptr == _schema_scanner) { return Status::InternalError("schema scanner get nullptr pointer."); } @@ -72,7 +73,7 @@ Status SchemaScanLocalState::open(RuntimeState* state) { SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_open_timer); RETURN_IF_ERROR(PipelineXLocalState<>::open(state)); - return _schema_scanner->start(state); + return _schema_scanner->get_next_block_async(state); } SchemaScanOperatorX::SchemaScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id, @@ -239,8 +240,12 @@ Status SchemaScanOperatorX::get_block(RuntimeState* state, vectorized::Block* bl while (true) { RETURN_IF_CANCELLED(state); + if (local_state._data_dependency->is_blocked_by() != nullptr) { + break; + } // get all slots from schema table. - RETURN_IF_ERROR(local_state._schema_scanner->get_next_block(&src_block, &schema_eos)); + RETURN_IF_ERROR( + local_state._schema_scanner->get_next_block(state, &src_block, &schema_eos)); if (schema_eos) { *eos = true; diff --git a/be/src/pipeline/exec/schema_scan_operator.h b/be/src/pipeline/exec/schema_scan_operator.h index bd336132efb..c026c105e95 100644 --- a/be/src/pipeline/exec/schema_scan_operator.h +++ b/be/src/pipeline/exec/schema_scan_operator.h @@ -55,18 +55,30 @@ public: ENABLE_FACTORY_CREATOR(SchemaScanLocalState); SchemaScanLocalState(RuntimeState* state, OperatorXBase* parent) - : PipelineXLocalState<>(state, parent) {} + : PipelineXLocalState<>(state, parent) { + _finish_dependency = + std::make_shared<Dependency>(parent->operator_id(), parent->node_id(), + parent->get_name() + "_FINISH_DEPENDENCY", true); + _data_dependency = std::make_shared<Dependency>(parent->operator_id(), parent->node_id(), + parent->get_name() + "_DEPENDENCY", true); + } ~SchemaScanLocalState() override = default; Status init(RuntimeState* state, LocalStateInfo& info) override; Status open(RuntimeState* state) override; + Dependency* finishdependency() override { return _finish_dependency.get(); } + std::vector<Dependency*> dependencies() const override { return {_data_dependency.get()}; } + private: friend class SchemaScanOperatorX; SchemaScannerParam _scanner_param; std::unique_ptr<SchemaScanner> _schema_scanner; + + std::shared_ptr<Dependency> _finish_dependency; + std::shared_ptr<Dependency> _data_dependency; }; class SchemaScanOperatorX final : public OperatorX<SchemaScanLocalState> { diff --git a/be/src/pipeline/pipeline_x/dependency.h b/be/src/pipeline/pipeline_x/dependency.h index c4b8b9b9ff0..b43ddb653a8 100644 --- a/be/src/pipeline/pipeline_x/dependency.h +++ b/be/src/pipeline/pipeline_x/dependency.h @@ -89,20 +89,11 @@ class Dependency : public std::enable_shared_from_this<Dependency> { public: ENABLE_FACTORY_CREATOR(Dependency); Dependency(int id, int node_id, std::string name) - : _id(id), - _node_id(node_id), - _name(std::move(name)), - _is_write_dependency(false), - _ready(false) {} + : _id(id), _node_id(node_id), _name(std::move(name)), _ready(false) {} Dependency(int id, int node_id, std::string name, bool ready) - : _id(id), - _node_id(node_id), - _name(std::move(name)), - _is_write_dependency(true), - _ready(ready) {} + : _id(id), _node_id(node_id), _name(std::move(name)), _ready(ready) {} virtual ~Dependency() = default; - bool is_write_dependency() const { return _is_write_dependency; } [[nodiscard]] int id() const { return _id; } [[nodiscard]] virtual std::string name() const { return _name; } BasicSharedState* shared_state() { return _shared_state; } @@ -119,12 +110,10 @@ public: // Notify downstream pipeline tasks this dependency is ready. void set_ready(); void set_ready_to_read() { - DCHECK(_is_write_dependency) << debug_string(); DCHECK(_shared_state->source_deps.size() == 1) << debug_string(); _shared_state->source_deps.front()->set_ready(); } void set_block_to_read() { - DCHECK(_is_write_dependency) << debug_string(); DCHECK(_shared_state->source_deps.size() == 1) << debug_string(); _shared_state->source_deps.front()->block(); } @@ -167,7 +156,6 @@ protected: const int _id; const int _node_id; const std::string _name; - const bool _is_write_dependency; std::atomic<bool> _ready; BasicSharedState* _shared_state = nullptr; diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp index b723fe02d7a..f05b491d50b 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp @@ -149,8 +149,6 @@ Status PipelineXTask::_extract_dependencies() { { auto* local_state = _state->get_sink_local_state(); write_dependencies = local_state->dependencies(); - DCHECK(std::all_of(write_dependencies.begin(), write_dependencies.end(), - [](auto* dep) { return dep->is_write_dependency(); })); auto* fin_dep = local_state->finishdependency(); if (fin_dep) { finish_dependencies.push_back(fin_dep); diff --git a/be/src/vec/exec/vschema_scan_node.cpp b/be/src/vec/exec/vschema_scan_node.cpp index 9ac4362318b..9e8c7a8d20e 100644 --- a/be/src/vec/exec/vschema_scan_node.cpp +++ b/be/src/vec/exec/vschema_scan_node.cpp @@ -249,7 +249,7 @@ Status VSchemaScanNode::get_next(RuntimeState* state, vectorized::Block* block, RETURN_IF_CANCELLED(state); // get all slots from schema table. - RETURN_IF_ERROR(_schema_scanner->get_next_block(&src_block, &schema_eos)); + RETURN_IF_ERROR(_schema_scanner->get_next_block_internal(&src_block, &schema_eos)); if (schema_eos) { *eos = true; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org