This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 9d23ccf1f2a [Improvement](schema scan) Use async scanner for schema 
scanners (#38… (#38666)
9d23ccf1f2a is described below

commit 9d23ccf1f2ae3fd6c3f3adc7af428c8d116f13db
Author: Gabriel <gabrielleeb...@gmail.com>
AuthorDate: Thu Aug 1 16:05:24 2024 +0800

    [Improvement](schema scan) Use async scanner for schema scanners (#38… 
(#38666)
    
    …403)
---
 be/src/exec/schema_scanner.cpp                     | 69 +++++++++++++++++++++-
 be/src/exec/schema_scanner.h                       | 24 +++++++-
 .../schema_active_queries_scanner.cpp              |  2 +-
 .../schema_scanner/schema_active_queries_scanner.h |  2 +-
 .../schema_scanner/schema_backend_active_tasks.cpp |  3 +-
 .../schema_scanner/schema_backend_active_tasks.h   |  2 +-
 .../schema_scanner/schema_charsets_scanner.cpp     |  2 +-
 .../exec/schema_scanner/schema_charsets_scanner.h  |  2 +-
 .../schema_scanner/schema_collations_scanner.cpp   |  2 +-
 .../schema_scanner/schema_collations_scanner.h     |  2 +-
 .../exec/schema_scanner/schema_columns_scanner.cpp |  2 +-
 .../exec/schema_scanner/schema_columns_scanner.h   |  2 +-
 .../exec/schema_scanner/schema_dummy_scanner.cpp   |  2 +-
 be/src/exec/schema_scanner/schema_dummy_scanner.h  |  2 +-
 .../exec/schema_scanner/schema_files_scanner.cpp   |  2 +-
 be/src/exec/schema_scanner/schema_files_scanner.h  |  2 +-
 .../schema_metadata_name_ids_scanner.cpp           |  2 +-
 .../schema_metadata_name_ids_scanner.h             |  2 +-
 .../schema_scanner/schema_partitions_scanner.cpp   |  2 +-
 .../schema_scanner/schema_partitions_scanner.h     |  2 +-
 .../schema_scanner/schema_processlist_scanner.cpp  |  2 +-
 .../schema_scanner/schema_processlist_scanner.h    |  2 +-
 .../schema_scanner/schema_profiling_scanner.cpp    |  2 +-
 .../exec/schema_scanner/schema_profiling_scanner.h |  2 +-
 .../exec/schema_scanner/schema_routine_scanner.cpp |  2 +-
 .../exec/schema_scanner/schema_routine_scanner.h   |  2 +-
 .../exec/schema_scanner/schema_rowsets_scanner.cpp |  2 +-
 .../exec/schema_scanner/schema_rowsets_scanner.h   |  2 +-
 .../schema_schema_privileges_scanner.cpp           |  2 +-
 .../schema_schema_privileges_scanner.h             |  2 +-
 .../schema_scanner/schema_schemata_scanner.cpp     |  2 +-
 .../exec/schema_scanner/schema_schemata_scanner.h  |  2 +-
 .../schema_table_privileges_scanner.cpp            |  2 +-
 .../schema_table_privileges_scanner.h              |  2 +-
 .../exec/schema_scanner/schema_tables_scanner.cpp  |  2 +-
 be/src/exec/schema_scanner/schema_tables_scanner.h |  2 +-
 .../schema_user_privileges_scanner.cpp             |  2 +-
 .../schema_user_privileges_scanner.h               |  2 +-
 be/src/exec/schema_scanner/schema_user_scanner.cpp |  2 +-
 be/src/exec/schema_scanner/schema_user_scanner.h   |  2 +-
 .../schema_scanner/schema_variables_scanner.cpp    |  2 +-
 .../exec/schema_scanner/schema_variables_scanner.h |  2 +-
 .../exec/schema_scanner/schema_views_scanner.cpp   |  2 +-
 be/src/exec/schema_scanner/schema_views_scanner.h  |  2 +-
 .../schema_workload_groups_scanner.cpp             |  2 +-
 .../schema_workload_groups_scanner.h               |  2 +-
 .../schema_workload_sched_policy_scanner.cpp       |  3 +-
 .../schema_workload_sched_policy_scanner.h         |  2 +-
 be/src/pipeline/exec/schema_scan_operator.cpp      |  9 ++-
 be/src/pipeline/exec/schema_scan_operator.h        | 14 ++++-
 be/src/pipeline/pipeline_x/dependency.h            | 16 +----
 be/src/pipeline/pipeline_x/pipeline_x_task.cpp     |  2 -
 be/src/vec/exec/vschema_scan_node.cpp              |  2 +-
 53 files changed, 162 insertions(+), 68 deletions(-)

diff --git a/be/src/exec/schema_scanner.cpp b/be/src/exec/schema_scanner.cpp
index 2b6b2c1f3c0..d9cafcf9049 100644
--- a/be/src/exec/schema_scanner.cpp
+++ b/be/src/exec/schema_scanner.cpp
@@ -50,7 +50,10 @@
 #include "exec/schema_scanner/schema_workload_groups_scanner.h"
 #include "exec/schema_scanner/schema_workload_sched_policy_scanner.h"
 #include "olap/hll.h"
+#include "pipeline/pipeline_x/dependency.h"
 #include "runtime/define_primitive_type.h"
+#include "runtime/fragment_mgr.h"
+#include "runtime/types.h"
 #include "util/string_util.h"
 #include "util/types.h"
 #include "vec/columns/column.h"
@@ -64,6 +67,7 @@
 #include "vec/core/column_with_type_and_name.h"
 #include "vec/core/types.h"
 #include "vec/data_types/data_type.h"
+#include "vec/data_types/data_type_factory.hpp"
 
 namespace doris {
 class ObjectPool;
@@ -84,7 +88,60 @@ Status SchemaScanner::start(RuntimeState* state) {
     return Status::OK();
 }
 
-Status SchemaScanner::get_next_block(vectorized::Block* block, bool* eos) {
+Status SchemaScanner::get_next_block(RuntimeState* state, vectorized::Block* 
block, bool* eos) {
+    if (_data_block == nullptr) {
+        return Status::InternalError("No data left!");
+    }
+    DCHECK(_async_thread_running == false);
+    RETURN_IF_ERROR(_scanner_status.status());
+    for (size_t i = 0; i < block->columns(); i++) {
+        std::move(*block->get_by_position(i).column)
+                .mutate()
+                ->insert_range_from(*_data_block->get_by_position(i).column, 0,
+                                    _data_block->rows());
+    }
+    _data_block->clear_column_data();
+    *eos = _eos;
+    if (!*eos) {
+        RETURN_IF_ERROR(get_next_block_async(state));
+    }
+    return Status::OK();
+}
+
+Status SchemaScanner::get_next_block_async(RuntimeState* state) {
+    _dependency->block();
+    auto task_ctx = state->get_task_execution_context();
+    
RETURN_IF_ERROR(ExecEnv::GetInstance()->fragment_mgr()->get_thread_pool()->submit_func(
+            [this, task_ctx, state]() {
+                DCHECK(_async_thread_running == false);
+                auto task_lock = task_ctx.lock();
+                if (task_lock == nullptr) {
+                    _scanner_status.update(Status::InternalError("Task context 
not exists!"));
+                    return;
+                }
+                SCOPED_ATTACH_TASK(state);
+                _dependency->block();
+                _async_thread_running = true;
+                _finish_dependency->block();
+                if (!_opened) {
+                    _data_block = vectorized::Block::create_unique();
+                    _init_block(_data_block.get());
+                    _scanner_status.update(start(state));
+                    _opened = true;
+                }
+                bool eos = false;
+                
_scanner_status.update(get_next_block_internal(_data_block.get(), &eos));
+                _eos = eos;
+                _async_thread_running = false;
+                _dependency->set_ready();
+                if (eos) {
+                    _finish_dependency->set_ready();
+                }
+            }));
+    return Status::OK();
+}
+
+Status SchemaScanner::get_next_block_internal(vectorized::Block* block, bool* 
eos) {
     if (!_is_init) {
         return Status::InternalError("used before initialized.");
     }
@@ -173,6 +230,16 @@ std::unique_ptr<SchemaScanner> 
SchemaScanner::create(TSchemaTableType::type type
     }
 }
 
+void SchemaScanner::_init_block(vectorized::Block* src_block) {
+    const std::vector<SchemaScanner::ColumnDesc>& 
columns_desc(get_column_desc());
+    for (int i = 0; i < columns_desc.size(); ++i) {
+        TypeDescriptor descriptor(columns_desc[i].type);
+        auto data_type = 
vectorized::DataTypeFactory::instance().create_data_type(descriptor, true);
+        
src_block->insert(vectorized::ColumnWithTypeAndName(data_type->create_column(), 
data_type,
+                                                            
columns_desc[i].name));
+    }
+}
+
 Status SchemaScanner::fill_dest_column_for_range(vectorized::Block* block, 
size_t pos,
                                                  const std::vector<void*>& 
datas) {
     const ColumnDesc& col_desc = _columns[pos];
diff --git a/be/src/exec/schema_scanner.h b/be/src/exec/schema_scanner.h
index a23706ac6a4..4666657af21 100644
--- a/be/src/exec/schema_scanner.h
+++ b/be/src/exec/schema_scanner.h
@@ -22,6 +22,7 @@
 #include <stddef.h>
 #include <stdint.h>
 
+#include <condition_variable>
 #include <memory>
 #include <string>
 #include <vector>
@@ -43,6 +44,10 @@ namespace vectorized {
 class Block;
 }
 
+namespace pipeline {
+class Dependency;
+}
+
 struct SchemaScannerCommonParam {
     SchemaScannerCommonParam()
             : db(nullptr),
@@ -94,15 +99,23 @@ public:
 
     // init object need information, schema etc.
     virtual Status init(SchemaScannerParam* param, ObjectPool* pool);
+    Status get_next_block(RuntimeState* state, vectorized::Block* block, bool* 
eos);
     // Start to work
     virtual Status start(RuntimeState* state);
-    virtual Status get_next_block(vectorized::Block* block, bool* eos);
+    virtual Status get_next_block_internal(vectorized::Block* block, bool* 
eos);
     const std::vector<ColumnDesc>& get_column_desc() const { return _columns; }
     // factory function
     static std::unique_ptr<SchemaScanner> create(TSchemaTableType::type type);
     TSchemaTableType::type type() const { return _schema_table_type; }
+    void set_dependency(std::shared_ptr<pipeline::Dependency> dep,
+                        std::shared_ptr<pipeline::Dependency> fin_dep) {
+        _dependency = dep;
+        _finish_dependency = fin_dep;
+    }
+    Status get_next_block_async(RuntimeState* state);
 
 protected:
+    void _init_block(vectorized::Block* src_block);
     Status fill_dest_column_for_range(vectorized::Block* block, size_t pos,
                                       const std::vector<void*>& datas);
 
@@ -125,6 +138,15 @@ protected:
     RuntimeProfile::Counter* _get_table_timer = nullptr;
     RuntimeProfile::Counter* _get_describe_timer = nullptr;
     RuntimeProfile::Counter* _fill_block_timer = nullptr;
+
+    std::shared_ptr<pipeline::Dependency> _dependency = nullptr;
+    std::shared_ptr<pipeline::Dependency> _finish_dependency = nullptr;
+
+    std::unique_ptr<vectorized::Block> _data_block;
+    AtomicStatus _scanner_status;
+    std::atomic<bool> _eos = false;
+    std::atomic<bool> _opened = false;
+    std::atomic<bool> _async_thread_running = false;
 };
 
 } // namespace doris
diff --git a/be/src/exec/schema_scanner/schema_active_queries_scanner.cpp 
b/be/src/exec/schema_scanner/schema_active_queries_scanner.cpp
index 2115a38a6eb..46522a36242 100644
--- a/be/src/exec/schema_scanner/schema_active_queries_scanner.cpp
+++ b/be/src/exec/schema_scanner/schema_active_queries_scanner.cpp
@@ -137,7 +137,7 @@ Status 
SchemaActiveQueriesScanner::_get_active_queries_block_from_fe() {
     return Status::OK();
 }
 
-Status SchemaActiveQueriesScanner::get_next_block(vectorized::Block* block, 
bool* eos) {
+Status SchemaActiveQueriesScanner::get_next_block_internal(vectorized::Block* 
block, bool* eos) {
     if (!_is_init) {
         return Status::InternalError("Used before initialized.");
     }
diff --git a/be/src/exec/schema_scanner/schema_active_queries_scanner.h 
b/be/src/exec/schema_scanner/schema_active_queries_scanner.h
index 1df5b1f9d74..7e9ae4b8034 100644
--- a/be/src/exec/schema_scanner/schema_active_queries_scanner.h
+++ b/be/src/exec/schema_scanner/schema_active_queries_scanner.h
@@ -36,7 +36,7 @@ public:
     ~SchemaActiveQueriesScanner() override;
 
     Status start(RuntimeState* state) override;
-    Status get_next_block(vectorized::Block* block, bool* eos) override;
+    Status get_next_block_internal(vectorized::Block* block, bool* eos) 
override;
 
     static std::vector<SchemaScanner::ColumnDesc> _s_tbls_columns;
 
diff --git a/be/src/exec/schema_scanner/schema_backend_active_tasks.cpp 
b/be/src/exec/schema_scanner/schema_backend_active_tasks.cpp
index f1155796ed4..b35e84a9f9c 100644
--- a/be/src/exec/schema_scanner/schema_backend_active_tasks.cpp
+++ b/be/src/exec/schema_scanner/schema_backend_active_tasks.cpp
@@ -51,7 +51,8 @@ Status SchemaBackendActiveTasksScanner::start(RuntimeState* 
state) {
     return Status::OK();
 }
 
-Status SchemaBackendActiveTasksScanner::get_next_block(vectorized::Block* 
block, bool* eos) {
+Status 
SchemaBackendActiveTasksScanner::get_next_block_internal(vectorized::Block* 
block,
+                                                                bool* eos) {
     if (!_is_init) {
         return Status::InternalError("Used before initialized.");
     }
diff --git a/be/src/exec/schema_scanner/schema_backend_active_tasks.h 
b/be/src/exec/schema_scanner/schema_backend_active_tasks.h
index d8a2a1ffa3f..43819818b57 100644
--- a/be/src/exec/schema_scanner/schema_backend_active_tasks.h
+++ b/be/src/exec/schema_scanner/schema_backend_active_tasks.h
@@ -36,7 +36,7 @@ public:
     ~SchemaBackendActiveTasksScanner() override;
 
     Status start(RuntimeState* state) override;
-    Status get_next_block(vectorized::Block* block, bool* eos) override;
+    Status get_next_block_internal(vectorized::Block* block, bool* eos) 
override;
 
     static std::vector<SchemaScanner::ColumnDesc> _s_tbls_columns;
 
diff --git a/be/src/exec/schema_scanner/schema_charsets_scanner.cpp 
b/be/src/exec/schema_scanner/schema_charsets_scanner.cpp
index 9bd7ad7919c..ff42e7f5a05 100644
--- a/be/src/exec/schema_scanner/schema_charsets_scanner.cpp
+++ b/be/src/exec/schema_scanner/schema_charsets_scanner.cpp
@@ -48,7 +48,7 @@ SchemaCharsetsScanner::SchemaCharsetsScanner()
 
 SchemaCharsetsScanner::~SchemaCharsetsScanner() {}
 
-Status SchemaCharsetsScanner::get_next_block(vectorized::Block* block, bool* 
eos) {
+Status SchemaCharsetsScanner::get_next_block_internal(vectorized::Block* 
block, bool* eos) {
     if (!_is_init) {
         return Status::InternalError("call this before initial.");
     }
diff --git a/be/src/exec/schema_scanner/schema_charsets_scanner.h 
b/be/src/exec/schema_scanner/schema_charsets_scanner.h
index 1f01070875c..d5089c62826 100644
--- a/be/src/exec/schema_scanner/schema_charsets_scanner.h
+++ b/be/src/exec/schema_scanner/schema_charsets_scanner.h
@@ -36,7 +36,7 @@ public:
     SchemaCharsetsScanner();
     ~SchemaCharsetsScanner() override;
 
-    Status get_next_block(vectorized::Block* block, bool* eos) override;
+    Status get_next_block_internal(vectorized::Block* block, bool* eos) 
override;
 
 private:
     struct CharsetStruct {
diff --git a/be/src/exec/schema_scanner/schema_collations_scanner.cpp 
b/be/src/exec/schema_scanner/schema_collations_scanner.cpp
index 812a8cff18e..271c9a6fb78 100644
--- a/be/src/exec/schema_scanner/schema_collations_scanner.cpp
+++ b/be/src/exec/schema_scanner/schema_collations_scanner.cpp
@@ -50,7 +50,7 @@ SchemaCollationsScanner::SchemaCollationsScanner()
 
 SchemaCollationsScanner::~SchemaCollationsScanner() {}
 
-Status SchemaCollationsScanner::get_next_block(vectorized::Block* block, bool* 
eos) {
+Status SchemaCollationsScanner::get_next_block_internal(vectorized::Block* 
block, bool* eos) {
     if (!_is_init) {
         return Status::InternalError("call this before initial.");
     }
diff --git a/be/src/exec/schema_scanner/schema_collations_scanner.h 
b/be/src/exec/schema_scanner/schema_collations_scanner.h
index f0f60538cac..2fe200da78d 100644
--- a/be/src/exec/schema_scanner/schema_collations_scanner.h
+++ b/be/src/exec/schema_scanner/schema_collations_scanner.h
@@ -36,7 +36,7 @@ public:
     SchemaCollationsScanner();
     ~SchemaCollationsScanner() override;
 
-    Status get_next_block(vectorized::Block* block, bool* eos) override;
+    Status get_next_block_internal(vectorized::Block* block, bool* eos) 
override;
 
 private:
     struct CollationStruct {
diff --git a/be/src/exec/schema_scanner/schema_columns_scanner.cpp 
b/be/src/exec/schema_scanner/schema_columns_scanner.cpp
index 763f24b9e53..440370c36c9 100644
--- a/be/src/exec/schema_scanner/schema_columns_scanner.cpp
+++ b/be/src/exec/schema_scanner/schema_columns_scanner.cpp
@@ -347,7 +347,7 @@ Status SchemaColumnsScanner::_get_new_table() {
     return Status::OK();
 }
 
-Status SchemaColumnsScanner::get_next_block(vectorized::Block* block, bool* 
eos) {
+Status SchemaColumnsScanner::get_next_block_internal(vectorized::Block* block, 
bool* eos) {
     if (!_is_init) {
         return Status::InternalError("use this class before inited.");
     }
diff --git a/be/src/exec/schema_scanner/schema_columns_scanner.h 
b/be/src/exec/schema_scanner/schema_columns_scanner.h
index 2499db7ed82..99150c36d10 100644
--- a/be/src/exec/schema_scanner/schema_columns_scanner.h
+++ b/be/src/exec/schema_scanner/schema_columns_scanner.h
@@ -38,7 +38,7 @@ public:
     SchemaColumnsScanner();
     ~SchemaColumnsScanner() override;
     Status start(RuntimeState* state) override;
-    Status get_next_block(vectorized::Block* block, bool* eos) override;
+    Status get_next_block_internal(vectorized::Block* block, bool* eos) 
override;
 
 private:
     Status _get_new_table();
diff --git a/be/src/exec/schema_scanner/schema_dummy_scanner.cpp 
b/be/src/exec/schema_scanner/schema_dummy_scanner.cpp
index 1d5956f390e..9e3a703d9fb 100644
--- a/be/src/exec/schema_scanner/schema_dummy_scanner.cpp
+++ b/be/src/exec/schema_scanner/schema_dummy_scanner.cpp
@@ -40,7 +40,7 @@ Status SchemaDummyScanner::start(RuntimeState* state) {
     return Status::OK();
 }
 
-Status SchemaDummyScanner::get_next_block(vectorized::Block* block, bool* eos) 
{
+Status SchemaDummyScanner::get_next_block_internal(vectorized::Block* block, 
bool* eos) {
     *eos = true;
     return Status::OK();
 }
diff --git a/be/src/exec/schema_scanner/schema_dummy_scanner.h 
b/be/src/exec/schema_scanner/schema_dummy_scanner.h
index a67f6fa25c1..0c5e4aabe35 100644
--- a/be/src/exec/schema_scanner/schema_dummy_scanner.h
+++ b/be/src/exec/schema_scanner/schema_dummy_scanner.h
@@ -33,7 +33,7 @@ public:
     SchemaDummyScanner();
     ~SchemaDummyScanner() override;
     Status start(RuntimeState* state = nullptr) override;
-    Status get_next_block(vectorized::Block* block, bool* eos) override;
+    Status get_next_block_internal(vectorized::Block* block, bool* eos) 
override;
 };
 
 } // namespace doris
diff --git a/be/src/exec/schema_scanner/schema_files_scanner.cpp 
b/be/src/exec/schema_scanner/schema_files_scanner.cpp
index 55b7a338c31..20aa07fa691 100644
--- a/be/src/exec/schema_scanner/schema_files_scanner.cpp
+++ b/be/src/exec/schema_scanner/schema_files_scanner.cpp
@@ -113,7 +113,7 @@ Status SchemaFilesScanner::start(RuntimeState* state) {
     return Status::OK();
 }
 
-Status SchemaFilesScanner::get_next_block(vectorized::Block* block, bool* eos) 
{
+Status SchemaFilesScanner::get_next_block_internal(vectorized::Block* block, 
bool* eos) {
     if (!_is_init) {
         return Status::InternalError("Used before initialized.");
     }
diff --git a/be/src/exec/schema_scanner/schema_files_scanner.h 
b/be/src/exec/schema_scanner/schema_files_scanner.h
index 6805a04be4a..bb3b2d68493 100644
--- a/be/src/exec/schema_scanner/schema_files_scanner.h
+++ b/be/src/exec/schema_scanner/schema_files_scanner.h
@@ -38,7 +38,7 @@ public:
     ~SchemaFilesScanner() override;
 
     Status start(RuntimeState* state) override;
-    Status get_next_block(vectorized::Block* block, bool* eos) override;
+    Status get_next_block_internal(vectorized::Block* block, bool* eos) 
override;
 
     int _db_index;
     int _table_index;
diff --git a/be/src/exec/schema_scanner/schema_metadata_name_ids_scanner.cpp 
b/be/src/exec/schema_scanner/schema_metadata_name_ids_scanner.cpp
index ef7b2b69c1e..aacd9431524 100644
--- a/be/src/exec/schema_scanner/schema_metadata_name_ids_scanner.cpp
+++ b/be/src/exec/schema_scanner/schema_metadata_name_ids_scanner.cpp
@@ -225,7 +225,7 @@ Status 
SchemaMetadataNameIdsScanner::_fill_block_impl(vectorized::Block* block)
     return Status::OK();
 }
 
-Status SchemaMetadataNameIdsScanner::get_next_block(vectorized::Block* block, 
bool* eos) {
+Status 
SchemaMetadataNameIdsScanner::get_next_block_internal(vectorized::Block* block, 
bool* eos) {
     if (!_is_init) {
         return Status::InternalError("Used before initialized.");
     }
diff --git a/be/src/exec/schema_scanner/schema_metadata_name_ids_scanner.h 
b/be/src/exec/schema_scanner/schema_metadata_name_ids_scanner.h
index 9981d441d85..c3beea77697 100644
--- a/be/src/exec/schema_scanner/schema_metadata_name_ids_scanner.h
+++ b/be/src/exec/schema_scanner/schema_metadata_name_ids_scanner.h
@@ -39,7 +39,7 @@ public:
     ~SchemaMetadataNameIdsScanner() override;
 
     Status start(RuntimeState* state) override;
-    Status get_next_block(vectorized::Block* block, bool* eos) override;
+    Status get_next_block_internal(vectorized::Block* block, bool* eos) 
override;
 
 private:
     Status _get_new_table();
diff --git a/be/src/exec/schema_scanner/schema_partitions_scanner.cpp 
b/be/src/exec/schema_scanner/schema_partitions_scanner.cpp
index f1ad1f594f8..ea7394e15e1 100644
--- a/be/src/exec/schema_scanner/schema_partitions_scanner.cpp
+++ b/be/src/exec/schema_scanner/schema_partitions_scanner.cpp
@@ -101,7 +101,7 @@ Status SchemaPartitionsScanner::start(RuntimeState* state) {
     return Status::OK();
 }
 
-Status SchemaPartitionsScanner::get_next_block(vectorized::Block* block, bool* 
eos) {
+Status SchemaPartitionsScanner::get_next_block_internal(vectorized::Block* 
block, bool* eos) {
     if (!_is_init) {
         return Status::InternalError("Used before initialized.");
     }
diff --git a/be/src/exec/schema_scanner/schema_partitions_scanner.h 
b/be/src/exec/schema_scanner/schema_partitions_scanner.h
index 47e1d1fcf87..87e55db984a 100644
--- a/be/src/exec/schema_scanner/schema_partitions_scanner.h
+++ b/be/src/exec/schema_scanner/schema_partitions_scanner.h
@@ -38,7 +38,7 @@ public:
     ~SchemaPartitionsScanner() override;
 
     Status start(RuntimeState* state) override;
-    Status get_next_block(vectorized::Block* block, bool* eos) override;
+    Status get_next_block_internal(vectorized::Block* block, bool* eos) 
override;
 
     int _db_index;
     int _table_index;
diff --git a/be/src/exec/schema_scanner/schema_processlist_scanner.cpp 
b/be/src/exec/schema_scanner/schema_processlist_scanner.cpp
index f5f5bc23634..f1071359d0a 100644
--- a/be/src/exec/schema_scanner/schema_processlist_scanner.cpp
+++ b/be/src/exec/schema_scanner/schema_processlist_scanner.cpp
@@ -62,7 +62,7 @@ Status SchemaProcessListScanner::start(RuntimeState* state) {
     return Status::OK();
 }
 
-Status SchemaProcessListScanner::get_next_block(vectorized::Block* block, 
bool* eos) {
+Status SchemaProcessListScanner::get_next_block_internal(vectorized::Block* 
block, bool* eos) {
     if (!_is_init) {
         return Status::InternalError("call this before initial.");
     }
diff --git a/be/src/exec/schema_scanner/schema_processlist_scanner.h 
b/be/src/exec/schema_scanner/schema_processlist_scanner.h
index 8aae87e1ef6..c0b0a47f615 100644
--- a/be/src/exec/schema_scanner/schema_processlist_scanner.h
+++ b/be/src/exec/schema_scanner/schema_processlist_scanner.h
@@ -40,7 +40,7 @@ public:
     ~SchemaProcessListScanner() override;
 
     Status start(RuntimeState* state) override;
-    Status get_next_block(vectorized::Block* block, bool* eos) override;
+    Status get_next_block_internal(vectorized::Block* block, bool* eos) 
override;
 
     static std::vector<SchemaScanner::ColumnDesc> _s_processlist_columns;
 
diff --git a/be/src/exec/schema_scanner/schema_profiling_scanner.cpp 
b/be/src/exec/schema_scanner/schema_profiling_scanner.cpp
index 2f71eb96f26..0a2a64330bb 100644
--- a/be/src/exec/schema_scanner/schema_profiling_scanner.cpp
+++ b/be/src/exec/schema_scanner/schema_profiling_scanner.cpp
@@ -88,7 +88,7 @@ Status SchemaProfilingScanner::start(RuntimeState* state) {
     return Status::OK();
 }
 
-Status SchemaProfilingScanner::get_next_block(vectorized::Block* block, bool* 
eos) {
+Status SchemaProfilingScanner::get_next_block_internal(vectorized::Block* 
block, bool* eos) {
     if (!_is_init) {
         return Status::InternalError("Used before initialized.");
     }
diff --git a/be/src/exec/schema_scanner/schema_profiling_scanner.h 
b/be/src/exec/schema_scanner/schema_profiling_scanner.h
index 5399cb14eb4..6b969a478ac 100644
--- a/be/src/exec/schema_scanner/schema_profiling_scanner.h
+++ b/be/src/exec/schema_scanner/schema_profiling_scanner.h
@@ -38,7 +38,7 @@ public:
     ~SchemaProfilingScanner() override;
 
     Status start(RuntimeState* state) override;
-    Status get_next_block(vectorized::Block* block, bool* eos) override;
+    Status get_next_block_internal(vectorized::Block* block, bool* eos) 
override;
 
     static std::vector<SchemaScanner::ColumnDesc> _s_tbls_columns;
 };
diff --git a/be/src/exec/schema_scanner/schema_routine_scanner.cpp 
b/be/src/exec/schema_scanner/schema_routine_scanner.cpp
index 3d55addee6c..8c263c99d2d 100644
--- a/be/src/exec/schema_scanner/schema_routine_scanner.cpp
+++ b/be/src/exec/schema_scanner/schema_routine_scanner.cpp
@@ -141,7 +141,7 @@ Status SchemaRoutinesScanner::get_block_from_fe() {
     return Status::OK();
 }
 
-Status SchemaRoutinesScanner::get_next_block(vectorized::Block* block, bool* 
eos) {
+Status SchemaRoutinesScanner::get_next_block_internal(vectorized::Block* 
block, bool* eos) {
     if (!_is_init) {
         return Status::InternalError("Used before initialized.");
     }
diff --git a/be/src/exec/schema_scanner/schema_routine_scanner.h 
b/be/src/exec/schema_scanner/schema_routine_scanner.h
index 543f9e8e8f6..c60d72340e1 100644
--- a/be/src/exec/schema_scanner/schema_routine_scanner.h
+++ b/be/src/exec/schema_scanner/schema_routine_scanner.h
@@ -36,7 +36,7 @@ public:
     ~SchemaRoutinesScanner() override = default;
 
     Status start(RuntimeState* state) override;
-    Status get_next_block(vectorized::Block* block, bool* eos) override;
+    Status get_next_block_internal(vectorized::Block* block, bool* eos) 
override;
 
     static std::vector<SchemaScanner::ColumnDesc> _s_tbls_columns;
 
diff --git a/be/src/exec/schema_scanner/schema_rowsets_scanner.cpp 
b/be/src/exec/schema_scanner/schema_rowsets_scanner.cpp
index b760d8bde04..8bfd785ce5d 100644
--- a/be/src/exec/schema_scanner/schema_rowsets_scanner.cpp
+++ b/be/src/exec/schema_scanner/schema_rowsets_scanner.cpp
@@ -92,7 +92,7 @@ Status SchemaRowsetsScanner::_get_all_rowsets() {
     return Status::OK();
 }
 
-Status SchemaRowsetsScanner::get_next_block(vectorized::Block* block, bool* 
eos) {
+Status SchemaRowsetsScanner::get_next_block_internal(vectorized::Block* block, 
bool* eos) {
     if (!_is_init) {
         return Status::InternalError("Used before initialized.");
     }
diff --git a/be/src/exec/schema_scanner/schema_rowsets_scanner.h 
b/be/src/exec/schema_scanner/schema_rowsets_scanner.h
index b975cc4231b..cad34fc0494 100644
--- a/be/src/exec/schema_scanner/schema_rowsets_scanner.h
+++ b/be/src/exec/schema_scanner/schema_rowsets_scanner.h
@@ -40,7 +40,7 @@ public:
     ~SchemaRowsetsScanner() override = default;
 
     Status start(RuntimeState* state) override;
-    Status get_next_block(vectorized::Block* block, bool* eos) override;
+    Status get_next_block_internal(vectorized::Block* block, bool* eos) 
override;
 
 private:
     Status _get_all_rowsets();
diff --git a/be/src/exec/schema_scanner/schema_schema_privileges_scanner.cpp 
b/be/src/exec/schema_scanner/schema_schema_privileges_scanner.cpp
index 09c470ff50a..c24b1fbb071 100644
--- a/be/src/exec/schema_scanner/schema_schema_privileges_scanner.cpp
+++ b/be/src/exec/schema_scanner/schema_schema_privileges_scanner.cpp
@@ -82,7 +82,7 @@ Status SchemaSchemaPrivilegesScanner::_get_new_table() {
     return Status::OK();
 }
 
-Status SchemaSchemaPrivilegesScanner::get_next_block(vectorized::Block* block, 
bool* eos) {
+Status 
SchemaSchemaPrivilegesScanner::get_next_block_internal(vectorized::Block* 
block, bool* eos) {
     if (!_is_init) {
         return Status::InternalError("Used before initialized.");
     }
diff --git a/be/src/exec/schema_scanner/schema_schema_privileges_scanner.h 
b/be/src/exec/schema_scanner/schema_schema_privileges_scanner.h
index af2ad49634b..9522fba908b 100644
--- a/be/src/exec/schema_scanner/schema_schema_privileges_scanner.h
+++ b/be/src/exec/schema_scanner/schema_schema_privileges_scanner.h
@@ -38,7 +38,7 @@ public:
     ~SchemaSchemaPrivilegesScanner() override;
 
     Status start(RuntimeState* state) override;
-    Status get_next_block(vectorized::Block* block, bool* eos) override;
+    Status get_next_block_internal(vectorized::Block* block, bool* eos) 
override;
 
 private:
     Status _get_new_table();
diff --git a/be/src/exec/schema_scanner/schema_schemata_scanner.cpp 
b/be/src/exec/schema_scanner/schema_schemata_scanner.cpp
index e09817ca310..e70b3b7a32e 100644
--- a/be/src/exec/schema_scanner/schema_schemata_scanner.cpp
+++ b/be/src/exec/schema_scanner/schema_schemata_scanner.cpp
@@ -81,7 +81,7 @@ Status SchemaSchemataScanner::start(RuntimeState* state) {
     return Status::OK();
 }
 
-Status SchemaSchemataScanner::get_next_block(vectorized::Block* block, bool* 
eos) {
+Status SchemaSchemataScanner::get_next_block_internal(vectorized::Block* 
block, bool* eos) {
     if (!_is_init) {
         return Status::InternalError("Used before Initialized.");
     }
diff --git a/be/src/exec/schema_scanner/schema_schemata_scanner.h 
b/be/src/exec/schema_scanner/schema_schemata_scanner.h
index 46fad31af1f..39a5ddda495 100644
--- a/be/src/exec/schema_scanner/schema_schemata_scanner.h
+++ b/be/src/exec/schema_scanner/schema_schemata_scanner.h
@@ -38,7 +38,7 @@ public:
     ~SchemaSchemataScanner() override;
 
     Status start(RuntimeState* state) override;
-    Status get_next_block(vectorized::Block* block, bool* eos) override;
+    Status get_next_block_internal(vectorized::Block* block, bool* eos) 
override;
 
 private:
     Status _fill_block_impl(vectorized::Block* block);
diff --git a/be/src/exec/schema_scanner/schema_table_privileges_scanner.cpp 
b/be/src/exec/schema_scanner/schema_table_privileges_scanner.cpp
index 41a3faf7c5a..4d2e6246656 100644
--- a/be/src/exec/schema_scanner/schema_table_privileges_scanner.cpp
+++ b/be/src/exec/schema_scanner/schema_table_privileges_scanner.cpp
@@ -84,7 +84,7 @@ Status SchemaTablePrivilegesScanner::_get_new_table() {
     return Status::OK();
 }
 
-Status SchemaTablePrivilegesScanner::get_next_block(vectorized::Block* block, 
bool* eos) {
+Status 
SchemaTablePrivilegesScanner::get_next_block_internal(vectorized::Block* block, 
bool* eos) {
     if (!_is_init) {
         return Status::InternalError("Used before initialized.");
     }
diff --git a/be/src/exec/schema_scanner/schema_table_privileges_scanner.h 
b/be/src/exec/schema_scanner/schema_table_privileges_scanner.h
index aa79c88304b..4cfcc16d358 100644
--- a/be/src/exec/schema_scanner/schema_table_privileges_scanner.h
+++ b/be/src/exec/schema_scanner/schema_table_privileges_scanner.h
@@ -38,7 +38,7 @@ public:
     ~SchemaTablePrivilegesScanner() override;
 
     Status start(RuntimeState* state) override;
-    Status get_next_block(vectorized::Block* block, bool* eos) override;
+    Status get_next_block_internal(vectorized::Block* block, bool* eos) 
override;
 
 private:
     Status _get_new_table();
diff --git a/be/src/exec/schema_scanner/schema_tables_scanner.cpp 
b/be/src/exec/schema_scanner/schema_tables_scanner.cpp
index 375ceb2c470..6aaafe1ae57 100644
--- a/be/src/exec/schema_scanner/schema_tables_scanner.cpp
+++ b/be/src/exec/schema_scanner/schema_tables_scanner.cpp
@@ -342,7 +342,7 @@ Status 
SchemaTablesScanner::_fill_block_impl(vectorized::Block* block) {
     return Status::OK();
 }
 
-Status SchemaTablesScanner::get_next_block(vectorized::Block* block, bool* 
eos) {
+Status SchemaTablesScanner::get_next_block_internal(vectorized::Block* block, 
bool* eos) {
     if (!_is_init) {
         return Status::InternalError("Used before initialized.");
     }
diff --git a/be/src/exec/schema_scanner/schema_tables_scanner.h 
b/be/src/exec/schema_scanner/schema_tables_scanner.h
index 11a96bf65d5..7f8eb11f397 100644
--- a/be/src/exec/schema_scanner/schema_tables_scanner.h
+++ b/be/src/exec/schema_scanner/schema_tables_scanner.h
@@ -39,7 +39,7 @@ public:
     ~SchemaTablesScanner() override;
 
     Status start(RuntimeState* state) override;
-    Status get_next_block(vectorized::Block* block, bool* eos) override;
+    Status get_next_block_internal(vectorized::Block* block, bool* eos) 
override;
 
 private:
     Status _get_new_table();
diff --git a/be/src/exec/schema_scanner/schema_user_privileges_scanner.cpp 
b/be/src/exec/schema_scanner/schema_user_privileges_scanner.cpp
index b636ff65fd7..f9f4b272aaa 100644
--- a/be/src/exec/schema_scanner/schema_user_privileges_scanner.cpp
+++ b/be/src/exec/schema_scanner/schema_user_privileges_scanner.cpp
@@ -81,7 +81,7 @@ Status SchemaUserPrivilegesScanner::_get_new_table() {
     return Status::OK();
 }
 
-Status SchemaUserPrivilegesScanner::get_next_block(vectorized::Block* block, 
bool* eos) {
+Status SchemaUserPrivilegesScanner::get_next_block_internal(vectorized::Block* 
block, bool* eos) {
     if (!_is_init) {
         return Status::InternalError("Used before initialized.");
     }
diff --git a/be/src/exec/schema_scanner/schema_user_privileges_scanner.h 
b/be/src/exec/schema_scanner/schema_user_privileges_scanner.h
index eb8f3c63f14..ffc3840db67 100644
--- a/be/src/exec/schema_scanner/schema_user_privileges_scanner.h
+++ b/be/src/exec/schema_scanner/schema_user_privileges_scanner.h
@@ -38,7 +38,7 @@ public:
     ~SchemaUserPrivilegesScanner() override;
 
     Status start(RuntimeState* state) override;
-    Status get_next_block(vectorized::Block* block, bool* eos) override;
+    Status get_next_block_internal(vectorized::Block* block, bool* eos) 
override;
 
 private:
     Status _get_new_table();
diff --git a/be/src/exec/schema_scanner/schema_user_scanner.cpp 
b/be/src/exec/schema_scanner/schema_user_scanner.cpp
index 9b153414380..e56f18f05ae 100644
--- a/be/src/exec/schema_scanner/schema_user_scanner.cpp
+++ b/be/src/exec/schema_scanner/schema_user_scanner.cpp
@@ -76,7 +76,7 @@ Status SchemaUserScanner::start(RuntimeState* state) {
     return Status::OK();
 }
 
-Status SchemaUserScanner::get_next_block(vectorized::Block* block, bool* eos) {
+Status SchemaUserScanner::get_next_block_internal(vectorized::Block* block, 
bool* eos) {
     if (!_is_init) {
         return Status::InternalError("call this before initial.");
     }
diff --git a/be/src/exec/schema_scanner/schema_user_scanner.h 
b/be/src/exec/schema_scanner/schema_user_scanner.h
index c55f216804d..bdc618eb5a0 100644
--- a/be/src/exec/schema_scanner/schema_user_scanner.h
+++ b/be/src/exec/schema_scanner/schema_user_scanner.h
@@ -40,7 +40,7 @@ public:
     ~SchemaUserScanner() override;
 
     Status start(RuntimeState* state) override;
-    Status get_next_block(vectorized::Block* block, bool* eos) override;
+    Status get_next_block_internal(vectorized::Block* block, bool* eos) 
override;
 
     static std::vector<SchemaScanner::ColumnDesc> _s_user_columns;
 
diff --git a/be/src/exec/schema_scanner/schema_variables_scanner.cpp 
b/be/src/exec/schema_scanner/schema_variables_scanner.cpp
index 491a11f2572..445089b36ab 100644
--- a/be/src/exec/schema_scanner/schema_variables_scanner.cpp
+++ b/be/src/exec/schema_scanner/schema_variables_scanner.cpp
@@ -70,7 +70,7 @@ Status SchemaVariablesScanner::start(RuntimeState* state) {
     return Status::OK();
 }
 
-Status SchemaVariablesScanner::get_next_block(vectorized::Block* block, bool* 
eos) {
+Status SchemaVariablesScanner::get_next_block_internal(vectorized::Block* 
block, bool* eos) {
     if (!_is_init) {
         return Status::InternalError("call this before initial.");
     }
diff --git a/be/src/exec/schema_scanner/schema_variables_scanner.h 
b/be/src/exec/schema_scanner/schema_variables_scanner.h
index 2d207ff8b2e..31bbacf713b 100644
--- a/be/src/exec/schema_scanner/schema_variables_scanner.h
+++ b/be/src/exec/schema_scanner/schema_variables_scanner.h
@@ -40,7 +40,7 @@ public:
     ~SchemaVariablesScanner() override;
 
     Status start(RuntimeState* state) override;
-    Status get_next_block(vectorized::Block* block, bool* eos) override;
+    Status get_next_block_internal(vectorized::Block* block, bool* eos) 
override;
 
 private:
     struct VariableStruct {
diff --git a/be/src/exec/schema_scanner/schema_views_scanner.cpp 
b/be/src/exec/schema_scanner/schema_views_scanner.cpp
index 7d9ce671a66..5b5f0c1b729 100644
--- a/be/src/exec/schema_scanner/schema_views_scanner.cpp
+++ b/be/src/exec/schema_scanner/schema_views_scanner.cpp
@@ -113,7 +113,7 @@ Status SchemaViewsScanner::_get_new_table() {
     return Status::OK();
 }
 
-Status SchemaViewsScanner::get_next_block(vectorized::Block* block, bool* eos) 
{
+Status SchemaViewsScanner::get_next_block_internal(vectorized::Block* block, 
bool* eos) {
     if (!_is_init) {
         return Status::InternalError("Used before initialized.");
     }
diff --git a/be/src/exec/schema_scanner/schema_views_scanner.h 
b/be/src/exec/schema_scanner/schema_views_scanner.h
index bc473057905..b86ad922e5e 100644
--- a/be/src/exec/schema_scanner/schema_views_scanner.h
+++ b/be/src/exec/schema_scanner/schema_views_scanner.h
@@ -38,7 +38,7 @@ public:
     ~SchemaViewsScanner() override;
 
     Status start(RuntimeState* state) override;
-    Status get_next_block(vectorized::Block* block, bool* eos) override;
+    Status get_next_block_internal(vectorized::Block* block, bool* eos) 
override;
 
 private:
     Status _get_new_table();
diff --git a/be/src/exec/schema_scanner/schema_workload_groups_scanner.cpp 
b/be/src/exec/schema_scanner/schema_workload_groups_scanner.cpp
index def52df531d..8b0c6be536b 100644
--- a/be/src/exec/schema_scanner/schema_workload_groups_scanner.cpp
+++ b/be/src/exec/schema_scanner/schema_workload_groups_scanner.cpp
@@ -114,7 +114,7 @@ Status 
SchemaWorkloadGroupsScanner::_get_workload_groups_block_from_fe() {
     return Status::OK();
 }
 
-Status SchemaWorkloadGroupsScanner::get_next_block(vectorized::Block* block, 
bool* eos) {
+Status SchemaWorkloadGroupsScanner::get_next_block_internal(vectorized::Block* 
block, bool* eos) {
     if (!_is_init) {
         return Status::InternalError("Used before initialized.");
     }
diff --git a/be/src/exec/schema_scanner/schema_workload_groups_scanner.h 
b/be/src/exec/schema_scanner/schema_workload_groups_scanner.h
index bf7a103526d..3121c4dbac1 100644
--- a/be/src/exec/schema_scanner/schema_workload_groups_scanner.h
+++ b/be/src/exec/schema_scanner/schema_workload_groups_scanner.h
@@ -36,7 +36,7 @@ public:
     ~SchemaWorkloadGroupsScanner() override;
 
     Status start(RuntimeState* state) override;
-    Status get_next_block(vectorized::Block* block, bool* eos) override;
+    Status get_next_block_internal(vectorized::Block* block, bool* eos) 
override;
 
     static std::vector<SchemaScanner::ColumnDesc> _s_tbls_columns;
 
diff --git 
a/be/src/exec/schema_scanner/schema_workload_sched_policy_scanner.cpp 
b/be/src/exec/schema_scanner/schema_workload_sched_policy_scanner.cpp
index 035d3bfe217..2d91f151f5f 100644
--- a/be/src/exec/schema_scanner/schema_workload_sched_policy_scanner.cpp
+++ b/be/src/exec/schema_scanner/schema_workload_sched_policy_scanner.cpp
@@ -106,7 +106,8 @@ Status 
SchemaWorkloadSchedulePolicyScanner::_get_workload_schedule_policy_block_
     return Status::OK();
 }
 
-Status SchemaWorkloadSchedulePolicyScanner::get_next_block(vectorized::Block* 
block, bool* eos) {
+Status 
SchemaWorkloadSchedulePolicyScanner::get_next_block_internal(vectorized::Block* 
block,
+                                                                    bool* eos) 
{
     if (!_is_init) {
         return Status::InternalError("Used before initialized.");
     }
diff --git a/be/src/exec/schema_scanner/schema_workload_sched_policy_scanner.h 
b/be/src/exec/schema_scanner/schema_workload_sched_policy_scanner.h
index 5284975fe66..da8d9f15c49 100644
--- a/be/src/exec/schema_scanner/schema_workload_sched_policy_scanner.h
+++ b/be/src/exec/schema_scanner/schema_workload_sched_policy_scanner.h
@@ -36,7 +36,7 @@ public:
     ~SchemaWorkloadSchedulePolicyScanner() override;
 
     Status start(RuntimeState* state) override;
-    Status get_next_block(vectorized::Block* block, bool* eos) override;
+    Status get_next_block_internal(vectorized::Block* block, bool* eos) 
override;
 
     static std::vector<SchemaScanner::ColumnDesc> _s_tbls_columns;
 
diff --git a/be/src/pipeline/exec/schema_scan_operator.cpp 
b/be/src/pipeline/exec/schema_scan_operator.cpp
index f26b2d706b7..8ff05cc41b7 100644
--- a/be/src/pipeline/exec/schema_scan_operator.cpp
+++ b/be/src/pipeline/exec/schema_scan_operator.cpp
@@ -61,6 +61,7 @@ Status SchemaScanLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
     // new one scanner
     _schema_scanner = SchemaScanner::create(schema_table->schema_table_type());
 
+    _schema_scanner->set_dependency(_data_dependency, _finish_dependency);
     if (nullptr == _schema_scanner) {
         return Status::InternalError("schema scanner get nullptr pointer.");
     }
@@ -72,7 +73,7 @@ Status SchemaScanLocalState::open(RuntimeState* state) {
     SCOPED_TIMER(exec_time_counter());
     SCOPED_TIMER(_open_timer);
     RETURN_IF_ERROR(PipelineXLocalState<>::open(state));
-    return _schema_scanner->start(state);
+    return _schema_scanner->get_next_block_async(state);
 }
 
 SchemaScanOperatorX::SchemaScanOperatorX(ObjectPool* pool, const TPlanNode& 
tnode, int operator_id,
@@ -239,8 +240,12 @@ Status SchemaScanOperatorX::get_block(RuntimeState* state, 
vectorized::Block* bl
         while (true) {
             RETURN_IF_CANCELLED(state);
 
+            if (local_state._data_dependency->is_blocked_by() != nullptr) {
+                break;
+            }
             // get all slots from schema table.
-            
RETURN_IF_ERROR(local_state._schema_scanner->get_next_block(&src_block, 
&schema_eos));
+            RETURN_IF_ERROR(
+                    local_state._schema_scanner->get_next_block(state, 
&src_block, &schema_eos));
 
             if (schema_eos) {
                 *eos = true;
diff --git a/be/src/pipeline/exec/schema_scan_operator.h 
b/be/src/pipeline/exec/schema_scan_operator.h
index bd336132efb..c026c105e95 100644
--- a/be/src/pipeline/exec/schema_scan_operator.h
+++ b/be/src/pipeline/exec/schema_scan_operator.h
@@ -55,18 +55,30 @@ public:
     ENABLE_FACTORY_CREATOR(SchemaScanLocalState);
 
     SchemaScanLocalState(RuntimeState* state, OperatorXBase* parent)
-            : PipelineXLocalState<>(state, parent) {}
+            : PipelineXLocalState<>(state, parent) {
+        _finish_dependency =
+                std::make_shared<Dependency>(parent->operator_id(), 
parent->node_id(),
+                                             parent->get_name() + 
"_FINISH_DEPENDENCY", true);
+        _data_dependency = std::make_shared<Dependency>(parent->operator_id(), 
parent->node_id(),
+                                                        parent->get_name() + 
"_DEPENDENCY", true);
+    }
     ~SchemaScanLocalState() override = default;
 
     Status init(RuntimeState* state, LocalStateInfo& info) override;
 
     Status open(RuntimeState* state) override;
 
+    Dependency* finishdependency() override { return _finish_dependency.get(); 
}
+    std::vector<Dependency*> dependencies() const override { return 
{_data_dependency.get()}; }
+
 private:
     friend class SchemaScanOperatorX;
 
     SchemaScannerParam _scanner_param;
     std::unique_ptr<SchemaScanner> _schema_scanner;
+
+    std::shared_ptr<Dependency> _finish_dependency;
+    std::shared_ptr<Dependency> _data_dependency;
 };
 
 class SchemaScanOperatorX final : public OperatorX<SchemaScanLocalState> {
diff --git a/be/src/pipeline/pipeline_x/dependency.h 
b/be/src/pipeline/pipeline_x/dependency.h
index c4b8b9b9ff0..b43ddb653a8 100644
--- a/be/src/pipeline/pipeline_x/dependency.h
+++ b/be/src/pipeline/pipeline_x/dependency.h
@@ -89,20 +89,11 @@ class Dependency : public 
std::enable_shared_from_this<Dependency> {
 public:
     ENABLE_FACTORY_CREATOR(Dependency);
     Dependency(int id, int node_id, std::string name)
-            : _id(id),
-              _node_id(node_id),
-              _name(std::move(name)),
-              _is_write_dependency(false),
-              _ready(false) {}
+            : _id(id), _node_id(node_id), _name(std::move(name)), 
_ready(false) {}
     Dependency(int id, int node_id, std::string name, bool ready)
-            : _id(id),
-              _node_id(node_id),
-              _name(std::move(name)),
-              _is_write_dependency(true),
-              _ready(ready) {}
+            : _id(id), _node_id(node_id), _name(std::move(name)), 
_ready(ready) {}
     virtual ~Dependency() = default;
 
-    bool is_write_dependency() const { return _is_write_dependency; }
     [[nodiscard]] int id() const { return _id; }
     [[nodiscard]] virtual std::string name() const { return _name; }
     BasicSharedState* shared_state() { return _shared_state; }
@@ -119,12 +110,10 @@ public:
     // Notify downstream pipeline tasks this dependency is ready.
     void set_ready();
     void set_ready_to_read() {
-        DCHECK(_is_write_dependency) << debug_string();
         DCHECK(_shared_state->source_deps.size() == 1) << debug_string();
         _shared_state->source_deps.front()->set_ready();
     }
     void set_block_to_read() {
-        DCHECK(_is_write_dependency) << debug_string();
         DCHECK(_shared_state->source_deps.size() == 1) << debug_string();
         _shared_state->source_deps.front()->block();
     }
@@ -167,7 +156,6 @@ protected:
     const int _id;
     const int _node_id;
     const std::string _name;
-    const bool _is_write_dependency;
     std::atomic<bool> _ready;
 
     BasicSharedState* _shared_state = nullptr;
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
index b723fe02d7a..f05b491d50b 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
@@ -149,8 +149,6 @@ Status PipelineXTask::_extract_dependencies() {
     {
         auto* local_state = _state->get_sink_local_state();
         write_dependencies = local_state->dependencies();
-        DCHECK(std::all_of(write_dependencies.begin(), 
write_dependencies.end(),
-                           [](auto* dep) { return dep->is_write_dependency(); 
}));
         auto* fin_dep = local_state->finishdependency();
         if (fin_dep) {
             finish_dependencies.push_back(fin_dep);
diff --git a/be/src/vec/exec/vschema_scan_node.cpp 
b/be/src/vec/exec/vschema_scan_node.cpp
index 9ac4362318b..9e8c7a8d20e 100644
--- a/be/src/vec/exec/vschema_scan_node.cpp
+++ b/be/src/vec/exec/vschema_scan_node.cpp
@@ -249,7 +249,7 @@ Status VSchemaScanNode::get_next(RuntimeState* state, 
vectorized::Block* block,
             RETURN_IF_CANCELLED(state);
 
             // get all slots from schema table.
-            RETURN_IF_ERROR(_schema_scanner->get_next_block(&src_block, 
&schema_eos));
+            
RETURN_IF_ERROR(_schema_scanner->get_next_block_internal(&src_block, 
&schema_eos));
 
             if (schema_eos) {
                 *eos = true;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to