This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 1545c36d163 Revert "[bugfix](scannercore) scanner will core in
deconstructor during collect profile (#28727)" (#28931)
1545c36d163 is described below
commit 1545c36d163836d5df3dbc72585043754756a35c
Author: yiguolei <[email protected]>
AuthorDate: Sun Dec 24 20:37:33 2023 +0800
Revert "[bugfix](scannercore) scanner will core in deconstructor during
collect profile (#28727)" (#28931)
This reverts commit 4066de375efe6ff8e156a61df4f9316b3d9eaa4e.
---
be/src/exec/exec_node.h | 4 -
be/src/pipeline/exec/scan_operator.cpp | 64 +++++--
be/src/pipeline/exec/scan_operator.h | 16 +-
be/src/vec/exec/scan/pip_scanner_context.h | 20 ++-
be/src/vec/exec/scan/scanner_context.cpp | 272 +++++++++++++++++++----------
be/src/vec/exec/scan/scanner_context.h | 47 +++--
be/src/vec/exec/scan/scanner_scheduler.cpp | 54 +++---
be/src/vec/exec/scan/scanner_scheduler.h | 4 +-
be/src/vec/exec/scan/vscan_node.cpp | 32 ++--
be/src/vec/exec/scan/vscan_node.h | 23 +--
be/src/vec/exec/scan/vscanner.h | 13 ++
11 files changed, 336 insertions(+), 213 deletions(-)
diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h
index 123097cfd53..eeed37907f9 100644
--- a/be/src/exec/exec_node.h
+++ b/be/src/exec/exec_node.h
@@ -239,10 +239,6 @@ public:
size_t children_count() const { return _children.size(); }
- // when the fragment is normal finished, call this method to do some
finish work
- // such as send the last buffer to remote.
- virtual Status try_close(RuntimeState* state) { return Status::OK(); }
-
protected:
friend class DataSink;
diff --git a/be/src/pipeline/exec/scan_operator.cpp
b/be/src/pipeline/exec/scan_operator.cpp
index 1e0f68131e8..05d9c7292f7 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -64,6 +64,14 @@ bool ScanOperator::can_read() {
}
}
+bool ScanOperator::is_pending_finish() const {
+ return _node->_scanner_ctx && !_node->_scanner_ctx->no_schedule();
+}
+
+Status ScanOperator::try_close(RuntimeState* state) {
+ return _node->try_close(state);
+}
+
bool ScanOperator::runtime_filters_are_ready_or_timeout() {
return _node->runtime_filters_are_ready_or_timeout();
}
@@ -73,8 +81,9 @@ std::string ScanOperator::debug_string() const {
fmt::format_to(debug_string_buffer, "{}, scanner_ctx is null: {} ",
SourceOperator::debug_string(), _node->_scanner_ctx ==
nullptr);
if (_node->_scanner_ctx) {
- fmt::format_to(debug_string_buffer, ", num_running_scanners = {}",
- _node->_scanner_ctx->get_num_running_scanners());
+ fmt::format_to(debug_string_buffer, ", num_running_scanners = {},
num_scheduling_ctx = {} ",
+ _node->_scanner_ctx->get_num_running_scanners(),
+ _node->_scanner_ctx->get_num_scheduling_ctx());
}
return fmt::to_string(debug_string_buffer);
}
@@ -92,6 +101,9 @@ std::string ScanOperator::debug_string() const {
template <typename Derived>
ScanLocalState<Derived>::ScanLocalState(RuntimeState* state, OperatorXBase*
parent)
: ScanLocalStateBase(state, parent) {
+ _finish_dependency = std::make_shared<FinishDependency>(
+ parent->operator_id(), parent->node_id(), parent->get_name() +
"_FINISH_DEPENDENCY",
+ state->get_query_ctx());
_filter_dependency = std::make_shared<RuntimeFilterDependency>(
parent->operator_id(), parent->node_id(), parent->get_name() +
"_FILTER_DEPENDENCY",
state->get_query_ctx());
@@ -165,6 +177,7 @@ Status ScanLocalState<Derived>::open(RuntimeState* state) {
auto status = _eos ? Status::OK() : _prepare_scanners();
if (_scanner_ctx) {
+ _finish_dependency->block();
DCHECK(!_eos && _num_scanners->value() > 0);
RETURN_IF_ERROR(_scanner_ctx->init());
RETURN_IF_ERROR(state->exec_env()->scanner_scheduler()->submit(_scanner_ctx));
@@ -557,14 +570,15 @@ std::string ScanLocalState<Derived>::debug_string(int
indentation_level) const {
PipelineXLocalState<>::debug_string(indentation_level),
_eos.load());
if (_scanner_ctx) {
fmt::format_to(debug_string_buffer, "");
- fmt::format_to(debug_string_buffer,
- ", Scanner Context: (_is_finished = {}, _should_stop =
{}, "
- "_num_running_scanners={}, "
- " _num_unfinished_scanners = {}, status = {}, error =
{})",
- _scanner_ctx->is_finished(),
_scanner_ctx->should_stop(),
- _scanner_ctx->get_num_running_scanners(),
- _scanner_ctx->get_num_unfinished_scanners(),
- _scanner_ctx->status().to_string(),
_scanner_ctx->status_error());
+ fmt::format_to(
+ debug_string_buffer,
+ ", Scanner Context: (_is_finished = {}, _should_stop = {}, "
+ "_num_running_scanners={}, "
+ "_num_scheduling_ctx = {}, _num_unfinished_scanners = {},
status = {}, error = {})",
+ _scanner_ctx->is_finished(), _scanner_ctx->should_stop(),
+ _scanner_ctx->get_num_running_scanners(),
_scanner_ctx->get_num_scheduling_ctx(),
+ _scanner_ctx->get_num_unfinished_scanners(),
_scanner_ctx->status().to_string(),
+ _scanner_ctx->status_error());
}
return fmt::to_string(debug_string_buffer);
@@ -1212,27 +1226,24 @@ template <typename Derived>
Status ScanLocalState<Derived>::_prepare_scanners() {
std::list<vectorized::VScannerSPtr> scanners;
RETURN_IF_ERROR(_init_scanners(&scanners));
- // Init scanner wrapper
- for (auto it = scanners.begin(); it != scanners.end(); ++it) {
-
_scanners.emplace_back(std::make_shared<vectorized::ScannerDelegate>(*it));
- }
if (scanners.empty()) {
_eos = true;
_scan_dependency->set_ready();
} else {
COUNTER_SET(_num_scanners, static_cast<int64_t>(scanners.size()));
- RETURN_IF_ERROR(_start_scanners(_scanners));
+ RETURN_IF_ERROR(_start_scanners(scanners));
}
return Status::OK();
}
template <typename Derived>
Status ScanLocalState<Derived>::_start_scanners(
- const std::list<std::shared_ptr<vectorized::ScannerDelegate>>&
scanners) {
+ const std::list<vectorized::VScannerSPtr>& scanners) {
auto& p = _parent->cast<typename Derived::Parent>();
_scanner_ctx = PipScannerContext::create_shared(state(), this,
p._output_tuple_desc, scanners,
p.limit(),
state()->scan_queue_mem_limit(),
- p._col_distribute_ids, 1,
_scan_dependency);
+ p._col_distribute_ids, 1,
_scan_dependency,
+ _finish_dependency);
return Status::OK();
}
@@ -1308,6 +1319,9 @@ Status ScanLocalState<Derived>::_init_profile() {
_max_scanner_thread_num = ADD_COUNTER(_runtime_profile,
"MaxScannerThreadNum", TUnit::UNIT);
+ _wait_for_finish_dependency_timer =
+ ADD_TIMER(_runtime_profile, "WaitForPendingFinishDependency");
+
return Status::OK();
}
@@ -1415,6 +1429,17 @@ Status ScanOperatorX<LocalStateType>::open(RuntimeState*
state) {
return Status::OK();
}
+template <typename LocalStateType>
+Status ScanOperatorX<LocalStateType>::try_close(RuntimeState* state) {
+ auto& local_state = get_local_state(state);
+ if (local_state._scanner_ctx) {
+ // mark this scanner ctx as should_stop to make sure scanners will not
be scheduled anymore
+ // TODO: there is a lock in `set_should_stop` may cause some slight
impact
+ local_state._scanner_ctx->set_should_stop();
+ }
+ return Status::OK();
+}
+
template <typename Derived>
Status ScanLocalState<Derived>::close(RuntimeState* state) {
if (_closed) {
@@ -1426,9 +1451,10 @@ Status ScanLocalState<Derived>::close(RuntimeState*
state) {
SCOPED_TIMER(exec_time_counter());
if (_scanner_ctx) {
- _scanner_ctx->stop_scanners(state);
+
_scanner_ctx->clear_and_join(reinterpret_cast<ScanLocalStateBase*>(this),
state);
}
COUNTER_SET(_wait_for_dependency_timer,
_scan_dependency->watcher_elapse_time());
+ COUNTER_SET(_wait_for_finish_dependency_timer,
_finish_dependency->watcher_elapse_time());
COUNTER_SET(_wait_for_rf_timer, _filter_dependency->watcher_elapse_time());
return PipelineXLocalState<>::close(state);
@@ -1485,7 +1511,7 @@ Status
ScanOperatorX<LocalStateType>::get_block(RuntimeState* state, vectorized:
if (eos) {
source_state = SourceState::FINISHED;
// reach limit, stop the scanners.
- local_state._scanner_ctx->stop_scanners(state);
+ local_state._scanner_ctx->set_should_stop();
}
return Status::OK();
diff --git a/be/src/pipeline/exec/scan_operator.h
b/be/src/pipeline/exec/scan_operator.h
index bf083d82d5d..3690e9eb39c 100644
--- a/be/src/pipeline/exec/scan_operator.h
+++ b/be/src/pipeline/exec/scan_operator.h
@@ -31,9 +31,6 @@
namespace doris {
class ExecNode;
} // namespace doris
-namespace doris::vectorized {
-class ScannerDelegate;
-}
namespace doris::pipeline {
class PipScannerContext;
@@ -51,9 +48,13 @@ public:
bool can_read() override; // for source
+ bool is_pending_finish() const override;
+
bool runtime_filters_are_ready_or_timeout() override;
std::string debug_string() const override;
+
+ Status try_close(RuntimeState* state) override;
};
class ScanDependency final : public Dependency {
@@ -170,6 +171,7 @@ protected:
RuntimeProfile::Counter* _wait_for_scanner_done_timer = nullptr;
// time of prefilter input block from scanner
RuntimeProfile::Counter* _wait_for_eos_timer = nullptr;
+ RuntimeProfile::Counter* _wait_for_finish_dependency_timer = nullptr;
RuntimeProfile::Counter* _wait_for_rf_timer = nullptr;
};
@@ -212,6 +214,7 @@ class ScanLocalState : public ScanLocalStateBase {
Dependency* dependency() override { return _scan_dependency.get(); }
RuntimeFilterDependency* filterdependency() override { return
_filter_dependency.get(); };
+ Dependency* finishdependency() override { return _finish_dependency.get();
}
protected:
template <typename LocalStateType>
@@ -347,7 +350,7 @@ protected:
Status _prepare_scanners();
// Submit the scanner to the thread pool and start execution
- Status _start_scanners(const
std::list<std::shared_ptr<vectorized::ScannerDelegate>>& scanners);
+ Status _start_scanners(const std::list<vectorized::VScannerSPtr>&
scanners);
// For some conjunct there is chance to elimate cast operator
// Eg. Variant's sub column could eliminate cast in storage layer if
@@ -410,13 +413,14 @@ protected:
std::shared_ptr<RuntimeFilterDependency> _filter_dependency;
- // ScanLocalState owns the ownership of scanner, scanner context only has
its weakptr
- std::list<std::shared_ptr<vectorized::ScannerDelegate>> _scanners;
+ std::shared_ptr<Dependency> _finish_dependency;
};
template <typename LocalStateType>
class ScanOperatorX : public OperatorX<LocalStateType> {
public:
+ Status try_close(RuntimeState* state) override;
+
Status init(const TPlanNode& tnode, RuntimeState* state) override;
Status prepare(RuntimeState* state) override { return
OperatorXBase::prepare(state); }
Status open(RuntimeState* state) override;
diff --git a/be/src/vec/exec/scan/pip_scanner_context.h
b/be/src/vec/exec/scan/pip_scanner_context.h
index 56ceb20bf15..309aed96a8c 100644
--- a/be/src/vec/exec/scan/pip_scanner_context.h
+++ b/be/src/vec/exec/scan/pip_scanner_context.h
@@ -31,9 +31,9 @@ class PipScannerContext : public vectorized::ScannerContext {
public:
PipScannerContext(RuntimeState* state, vectorized::VScanNode* parent,
const TupleDescriptor* output_tuple_desc,
- const
std::list<std::shared_ptr<vectorized::ScannerDelegate>>& scanners,
- int64_t limit_, int64_t max_bytes_in_blocks_queue,
- const std::vector<int>& col_distribute_ids, const int
num_parallel_instances)
+ const std::list<vectorized::VScannerSPtr>& scanners,
int64_t limit_,
+ int64_t max_bytes_in_blocks_queue, const
std::vector<int>& col_distribute_ids,
+ const int num_parallel_instances)
: vectorized::ScannerContext(state, parent, output_tuple_desc,
scanners, limit_,
max_bytes_in_blocks_queue,
num_parallel_instances),
_col_distribute_ids(col_distribute_ids),
@@ -41,13 +41,14 @@ public:
PipScannerContext(RuntimeState* state, ScanLocalStateBase* local_state,
const TupleDescriptor* output_tuple_desc,
- const
std::list<std::shared_ptr<vectorized::ScannerDelegate>>& scanners,
- int64_t limit_, int64_t max_bytes_in_blocks_queue,
- const std::vector<int>& col_distribute_ids, const int
num_parallel_instances,
- std::shared_ptr<pipeline::ScanDependency> dependency)
+ const std::list<vectorized::VScannerSPtr>& scanners,
int64_t limit_,
+ int64_t max_bytes_in_blocks_queue, const
std::vector<int>& col_distribute_ids,
+ const int num_parallel_instances,
+ std::shared_ptr<pipeline::ScanDependency> dependency,
+ std::shared_ptr<pipeline::Dependency> finish_dependency)
: vectorized::ScannerContext(state, output_tuple_desc, scanners,
limit_,
max_bytes_in_blocks_queue,
num_parallel_instances,
- local_state, dependency),
+ local_state, dependency,
finish_dependency),
_need_colocate_distribute(false) {}
Status get_block_from_queue(RuntimeState* state, vectorized::BlockUPtr*
block, bool* eos,
@@ -110,6 +111,9 @@ public:
return Status::OK();
}
+ // We should make those method lock free.
+ bool done() override { return _is_finished || _should_stop; }
+
void append_blocks_to_queue(std::vector<vectorized::BlockUPtr>& blocks)
override {
const int queue_size = _blocks_queues.size();
const int block_size = blocks.size();
diff --git a/be/src/vec/exec/scan/scanner_context.cpp
b/be/src/vec/exec/scan/scanner_context.cpp
index 908b2a663b7..5ad2dbec5b6 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -46,11 +46,11 @@ namespace doris::vectorized {
using namespace std::chrono_literals;
ScannerContext::ScannerContext(RuntimeState* state, const TupleDescriptor*
output_tuple_desc,
- const
std::list<std::shared_ptr<ScannerDelegate>>& scanners,
- int64_t limit_, int64_t
max_bytes_in_blocks_queue,
- const int num_parallel_instances,
+ const std::list<VScannerSPtr>& scanners,
int64_t limit_,
+ int64_t max_bytes_in_blocks_queue, const int
num_parallel_instances,
pipeline::ScanLocalStateBase* local_state,
- std::shared_ptr<pipeline::ScanDependency>
dependency)
+ std::shared_ptr<pipeline::ScanDependency>
dependency,
+ std::shared_ptr<pipeline::Dependency>
finish_dependency)
: _state(state),
_parent(nullptr),
_local_state(local_state),
@@ -61,10 +61,11 @@ ScannerContext::ScannerContext(RuntimeState* state, const
TupleDescriptor* outpu
_max_bytes_in_queue(std::max(max_bytes_in_blocks_queue,
(int64_t)1024) *
num_parallel_instances),
_scanner_scheduler(state->exec_env()->scanner_scheduler()),
- _scanners(scanners.begin(), scanners.end()),
- _all_scanners(scanners.begin(), scanners.end()),
+ _scanners(scanners),
+ _scanners_ref(scanners.begin(), scanners.end()),
_num_parallel_instances(num_parallel_instances),
- _dependency(dependency) {
+ _dependency(dependency),
+ _finish_dependency(finish_dependency) {
// Use the task exec context as a lock between scanner threads and
fragment exection threads
_task_exec_ctx = _state->get_task_execution_context();
_query_id = _state->get_query_ctx()->query_id();
@@ -91,9 +92,8 @@ ScannerContext::ScannerContext(RuntimeState* state, const
TupleDescriptor* outpu
ScannerContext::ScannerContext(doris::RuntimeState* state,
doris::vectorized::VScanNode* parent,
const doris::TupleDescriptor* output_tuple_desc,
- const
std::list<std::shared_ptr<ScannerDelegate>>& scanners,
- int64_t limit_, int64_t
max_bytes_in_blocks_queue,
- const int num_parallel_instances,
+ const std::list<VScannerSPtr>& scanners,
int64_t limit_,
+ int64_t max_bytes_in_blocks_queue, const int
num_parallel_instances,
pipeline::ScanLocalStateBase* local_state)
: _state(state),
_parent(parent),
@@ -105,8 +105,8 @@ ScannerContext::ScannerContext(doris::RuntimeState* state,
doris::vectorized::VS
_max_bytes_in_queue(std::max(max_bytes_in_blocks_queue,
(int64_t)1024) *
num_parallel_instances),
_scanner_scheduler(state->exec_env()->scanner_scheduler()),
- _scanners(scanners.begin(), scanners.end()),
- _all_scanners(scanners.begin(), scanners.end()),
+ _scanners(scanners),
+ _scanners_ref(scanners.begin(), scanners.end()),
_num_parallel_instances(num_parallel_instances) {
// Use the task exec context as a lock between scanner threads and
fragment exection threads
_task_exec_ctx = _state->get_task_execution_context();
@@ -182,6 +182,10 @@ Status ScannerContext::init() {
}
#endif
+ // 4. This ctx will be submitted to the scanner scheduler right after init.
+ // So set _num_scheduling_ctx to 1 here.
+ _num_scheduling_ctx = 1;
+
_num_unfinished_scanners = _scanners.size();
if (_parent) {
@@ -271,9 +275,11 @@ Status ScannerContext::get_block_from_queue(RuntimeState*
state, vectorized::Blo
bool is_scheduled = false;
if (!done() && to_be_schedule && _num_running_scanners == 0) {
is_scheduled = true;
- auto submit_status =
_scanner_scheduler->submit(shared_from_this());
- if (!submit_status.ok()) {
- set_status_on_error(submit_status, false);
+ auto state = _scanner_scheduler->submit(shared_from_this());
+ if (state.ok()) {
+ _num_scheduling_ctx++;
+ } else {
+ set_status_on_error(state, false);
}
}
@@ -364,17 +370,41 @@ Status ScannerContext::validate_block_schema(Block*
block) {
return Status::OK();
}
+void ScannerContext::set_should_stop() {
+ std::lock_guard l(_transfer_lock);
+ _should_stop = true;
+ _set_scanner_done();
+ for (const VScannerWPtr& scanner : _scanners_ref) {
+ if (VScannerSPtr sc = scanner.lock()) {
+ sc->try_stop();
+ }
+ }
+ _blocks_queue_added_cv.notify_one();
+ set_ready_to_finish();
+}
+
void ScannerContext::inc_num_running_scanners(int32_t inc) {
std::lock_guard l(_transfer_lock);
_num_running_scanners += inc;
}
-void ScannerContext::dec_num_running_scanners(int32_t scanner_dec) {
+void ScannerContext::dec_num_scheduling_ctx() {
std::lock_guard l(_transfer_lock);
- _num_running_scanners -= scanner_dec;
+ _num_scheduling_ctx--;
+ set_ready_to_finish();
+ if (_num_running_scanners == 0 && _num_scheduling_ctx == 0) {
+ _ctx_finish_cv.notify_one();
+ }
+}
+
+void ScannerContext::set_ready_to_finish() {
+ // `_should_stop == true` means this task has already ended and wait for
pending finish now.
+ if (_finish_dependency && done() && _num_running_scanners == 0 &&
_num_scheduling_ctx == 0) {
+ _finish_dependency->set_ready();
+ }
}
-void ScannerContext::set_status_on_error(const Status& status, bool need_lock)
{
+bool ScannerContext::set_status_on_error(const Status& status, bool need_lock)
{
std::unique_lock l(_transfer_lock, std::defer_lock);
if (need_lock) {
l.lock();
@@ -385,20 +415,14 @@ void ScannerContext::set_status_on_error(const Status&
status, bool need_lock) {
_blocks_queue_added_cv.notify_one();
_should_stop = true;
_set_scanner_done();
+ return true;
}
+ return false;
}
-void ScannerContext::stop_scanners(RuntimeState* state) {
- std::unique_lock l(_transfer_lock);
- _should_stop = true;
- _set_scanner_done();
- for (const std::weak_ptr<ScannerDelegate>& scanner : _all_scanners) {
- if (std::shared_ptr<ScannerDelegate> sc = scanner.lock()) {
- sc->_scanner->try_stop();
- }
- }
- _blocks_queue.clear();
- // TODO yiguolei, call mark close to scanners
+template <typename Parent>
+Status ScannerContext::_close_and_clear_scanners(Parent* parent, RuntimeState*
state) {
+ std::unique_lock l(_scanners_lock);
if (state->enable_profile()) {
std::stringstream scanner_statistics;
std::stringstream scanner_rows_read;
@@ -406,38 +430,76 @@ void ScannerContext::stop_scanners(RuntimeState* state) {
scanner_statistics << "[";
scanner_rows_read << "[";
scanner_wait_worker_time << "[";
- // Scanners can in 3 state
- // state 1: in scanner context, not scheduled
- // state 2: in scanner worker pool's queue, scheduled but not running
- // state 3: scanner is running.
- for (auto& scanner_ref : _all_scanners) {
- auto scanner = scanner_ref.lock();
- if (scanner == nullptr) {
- continue;
- }
+ for (auto finished_scanner_time : _finished_scanner_runtime) {
+ scanner_statistics << PrettyPrinter::print(finished_scanner_time,
TUnit::TIME_NS)
+ << ", ";
+ }
+ for (auto finished_scanner_rows : _finished_scanner_rows_read) {
+ scanner_rows_read << PrettyPrinter::print(finished_scanner_rows,
TUnit::UNIT) << ", ";
+ }
+ for (auto finished_scanner_wait_time :
_finished_scanner_wait_worker_time) {
+ scanner_wait_worker_time
+ << PrettyPrinter::print(finished_scanner_wait_time,
TUnit::TIME_NS) << ", ";
+ }
+ // Only unfinished scanners here
+ for (auto& scanner : _scanners) {
+ // Scanners are in ObjPool in ScanNode,
+ // so no need to delete them here.
// Add per scanner running time before close them
- scanner_statistics <<
PrettyPrinter::print(scanner->_scanner->get_time_cost_ns(),
- TUnit::TIME_NS)
+ scanner_statistics <<
PrettyPrinter::print(scanner->get_time_cost_ns(), TUnit::TIME_NS)
<< ", ";
- scanner_rows_read <<
PrettyPrinter::print(scanner->_scanner->get_rows_read(),
- TUnit::UNIT)
+ scanner_rows_read <<
PrettyPrinter::print(scanner->get_rows_read(), TUnit::UNIT)
<< ", ";
scanner_wait_worker_time
- <<
PrettyPrinter::print(scanner->_scanner->get_scanner_wait_worker_timer(),
+ <<
PrettyPrinter::print(scanner->get_scanner_wait_worker_timer(),
TUnit::TIME_NS)
<< ", ";
- // since there are all scanners, some scanners is running, so that
could not call scanner
- // close here.
}
scanner_statistics << "]";
scanner_rows_read << "]";
scanner_wait_worker_time << "]";
- _scanner_profile->add_info_string("PerScannerRunningTime",
scanner_statistics.str());
- _scanner_profile->add_info_string("PerScannerRowsRead",
scanner_rows_read.str());
- _scanner_profile->add_info_string("PerScannerWaitTime",
scanner_wait_worker_time.str());
+ parent->scanner_profile()->add_info_string("PerScannerRunningTime",
+ scanner_statistics.str());
+ parent->scanner_profile()->add_info_string("PerScannerRowsRead",
scanner_rows_read.str());
+ parent->scanner_profile()->add_info_string("PerScannerWaitTime",
+
scanner_wait_worker_time.str());
+ }
+ // Only unfinished scanners here
+ for (auto& scanner : _scanners) {
+ static_cast<void>(scanner->close(state));
+ // Scanners are in ObjPool in ScanNode,
+ // so no need to delete them here.
}
+ _scanners.clear();
+ return Status::OK();
+}
- _blocks_queue_added_cv.notify_one();
+template <typename Parent>
+void ScannerContext::clear_and_join(Parent* parent, RuntimeState* state) {
+ std::unique_lock l(_transfer_lock);
+ do {
+ if (_num_running_scanners == 0 && _num_scheduling_ctx == 0) {
+ break;
+ } else {
+ DCHECK(!state->enable_pipeline_exec())
+ << " _num_running_scanners: " << _num_running_scanners
+ << " _num_scheduling_ctx: " << _num_scheduling_ctx;
+ while (!(_num_running_scanners == 0 && _num_scheduling_ctx == 0)) {
+ _ctx_finish_cv.wait(l);
+ }
+ break;
+ }
+ } while (false);
+ // Must wait all running scanners stop running.
+ // So that we can make sure to close all scanners.
+ static_cast<void>(_close_and_clear_scanners(parent, state));
+
+ _blocks_queue.clear();
+}
+
+bool ScannerContext::no_schedule() {
+ std::unique_lock l(_transfer_lock);
+ return _num_running_scanners == 0 && _num_scheduling_ctx == 0;
}
void ScannerContext::_set_scanner_done() {
@@ -450,11 +512,12 @@ std::string ScannerContext::debug_string() {
return fmt::format(
"id: {}, sacnners: {}, blocks in queue: {},"
" status: {}, _should_stop: {}, _is_finished: {}, free blocks: {},"
- " limit: {}, _num_running_scanners: {}, _max_thread_num: {},"
+ " limit: {}, _num_running_scanners: {}, _num_scheduling_ctx: {},
_max_thread_num: {},"
" _block_per_scanner: {}, _cur_bytes_in_queue: {},
MAX_BYTE_OF_QUEUE: {}",
ctx_id, _scanners.size(), _blocks_queue.size(), status().ok(),
_should_stop,
- _is_finished, _free_blocks.size_approx(), limit,
_num_running_scanners, _max_thread_num,
- _block_per_scanner, _cur_bytes_in_queue, _max_bytes_in_queue);
+ _is_finished, _free_blocks.size_approx(), limit,
_num_running_scanners,
+ _num_scheduling_ctx, _max_thread_num, _block_per_scanner,
_cur_bytes_in_queue,
+ _max_bytes_in_queue);
}
void ScannerContext::reschedule_scanner_ctx() {
@@ -462,67 +525,84 @@ void ScannerContext::reschedule_scanner_ctx() {
if (done()) {
return;
}
- auto submit_status = _scanner_scheduler->submit(shared_from_this());
+ auto state = _scanner_scheduler->submit(shared_from_this());
//todo(wb) rethinking is it better to mark current scan_context failed
when submit failed many times?
- if (!submit_status.ok()) {
- set_status_on_error(submit_status, false);
+ if (state.ok()) {
+ _num_scheduling_ctx++;
+ } else {
+ set_status_on_error(state, false);
}
}
-void
ScannerContext::push_back_scanner_and_reschedule(std::shared_ptr<ScannerDelegate>
scanner) {
- std::lock_guard l(_transfer_lock);
- // Use a transfer lock to avoid the scanner be scheduled concurrently. For
example, that after
- // calling "_scanners.push_front(scanner)", there may be other ctx in
scheduler
- // to schedule that scanner right away, and in that schedule run, the
scanner may be marked as closed
- // before we call the following if() block.
- if (scanner->_scanner->need_to_close()) {
- --_num_unfinished_scanners;
- if (_num_unfinished_scanners == 0) {
- _dispose_coloate_blocks_not_in_queue();
- _is_finished = true;
- _set_scanner_done();
- _blocks_queue_added_cv.notify_one();
- return;
- }
+void ScannerContext::push_back_scanner_and_reschedule(VScannerSPtr scanner) {
+ {
+ std::unique_lock l(_scanners_lock);
+ _scanners.push_front(scanner);
}
+ std::lock_guard l(_transfer_lock);
- _scanners.push_front(scanner);
+ // In pipeline engine, doris will close scanners when `no_schedule`.
+ // We have to decrease _num_running_scanners before schedule, otherwise
+ // schedule does not woring due to _num_running_scanners.
+ _num_running_scanners--;
+ set_ready_to_finish();
- if (should_be_scheduled()) {
- auto submit_status = _scanner_scheduler->submit(shared_from_this());
- if (!submit_status.ok()) {
- set_status_on_error(submit_status, false);
+ if (!done() && should_be_scheduled()) {
+ auto state = _scanner_scheduler->submit(shared_from_this());
+ if (state.ok()) {
+ _num_scheduling_ctx++;
+ } else {
+ set_status_on_error(state, false);
}
}
+
+ // Notice that after calling "_scanners.push_front(scanner)", there may be
other ctx in scheduler
+ // to schedule that scanner right away, and in that schedule run, the
scanner may be marked as closed
+ // before we call the following if() block.
+ // So we need "scanner->set_counted_down()" to avoid
"_num_unfinished_scanners" being decreased twice by
+ // same scanner.
+ if (scanner->need_to_close() && scanner->set_counted_down() &&
+ (--_num_unfinished_scanners) == 0) {
+ _dispose_coloate_blocks_not_in_queue();
+ _is_finished = true;
+ _set_scanner_done();
+ _blocks_queue_added_cv.notify_one();
+ }
+ _ctx_finish_cv.notify_one();
}
-// This method is called in scanner scheduler, and task context is hold
-void ScannerContext::get_next_batch_of_scanners(
- std::list<std::weak_ptr<ScannerDelegate>>* current_run) {
- std::lock_guard l(_transfer_lock);
- // Update the sched counter for profile
- Defer defer {[&]() { _scanner_sched_counter->update(current_run->size());
}};
+void ScannerContext::get_next_batch_of_scanners(std::list<VScannerSPtr>*
current_run) {
// 1. Calculate how many scanners should be scheduled at this run.
- // If there are enough space in blocks queue,
- // the scanner number depends on the _free_blocks numbers
- int thread_slot_num = get_available_thread_slot_num();
+ int thread_slot_num = 0;
+ {
+ // If there are enough space in blocks queue,
+ // the scanner number depends on the _free_blocks numbers
+ thread_slot_num = get_available_thread_slot_num();
+ }
// 2. get #thread_slot_num scanners from ctx->scanners
// and put them into "this_run".
- for (int i = 0; i < thread_slot_num && !_scanners.empty();) {
- std::weak_ptr<ScannerDelegate> scanner_ref = _scanners.front();
- std::shared_ptr<ScannerDelegate> scanner = scanner_ref.lock();
- _scanners.pop_front();
- if (scanner == nullptr) {
- continue;
- }
- if (scanner->_scanner->need_to_close()) {
- static_cast<void>(scanner->_scanner->close(_state));
- } else {
- current_run->push_back(scanner_ref);
- i++;
+ {
+ std::unique_lock l(_scanners_lock);
+ for (int i = 0; i < thread_slot_num && !_scanners.empty();) {
+ VScannerSPtr scanner = _scanners.front();
+ _scanners.pop_front();
+ if (scanner->need_to_close()) {
+
_finished_scanner_runtime.push_back(scanner->get_time_cost_ns());
+
_finished_scanner_rows_read.push_back(scanner->get_rows_read());
+ _finished_scanner_wait_worker_time.push_back(
+ scanner->get_scanner_wait_worker_timer());
+ static_cast<void>(scanner->close(_state));
+ } else {
+ current_run->push_back(scanner);
+ i++;
+ }
}
}
}
+template void ScannerContext::clear_and_join(pipeline::ScanLocalStateBase*
parent,
+ RuntimeState* state);
+template void ScannerContext::clear_and_join(VScanNode* parent, RuntimeState*
state);
+
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/scanner_context.h
b/be/src/vec/exec/scan/scanner_context.h
index e320eb55b2e..ba9c1fdee10 100644
--- a/be/src/vec/exec/scan/scanner_context.h
+++ b/be/src/vec/exec/scan/scanner_context.h
@@ -53,7 +53,6 @@ class TaskGroup;
namespace vectorized {
class VScanner;
-class ScannerDelegate;
class VScanNode;
class ScannerScheduler;
class SimplifiedScanScheduler;
@@ -71,7 +70,7 @@ class ScannerContext : public
std::enable_shared_from_this<ScannerContext> {
public:
ScannerContext(RuntimeState* state, VScanNode* parent, const
TupleDescriptor* output_tuple_desc,
- const std::list<std::shared_ptr<ScannerDelegate>>&
scanners, int64_t limit_,
+ const std::list<VScannerSPtr>& scanners, int64_t limit_,
int64_t max_bytes_in_blocks_queue, const int
num_parallel_instances = 1,
pipeline::ScanLocalStateBase* local_state = nullptr);
@@ -93,9 +92,9 @@ public:
// When a scanner complete a scan, this method will be called
// to return the scanner to the list for next scheduling.
- void push_back_scanner_and_reschedule(std::shared_ptr<ScannerDelegate>
scanner);
+ void push_back_scanner_and_reschedule(VScannerSPtr scanner);
- void set_status_on_error(const Status& status, bool need_lock = true);
+ bool set_status_on_error(const Status& status, bool need_lock = true);
Status status() {
if (_process_status.is<ErrorCode::END_OF_FILE>()) {
@@ -104,21 +103,34 @@ public:
return _process_status;
}
+ // Called by ScanNode.
+ // Used to notify the scheduler that this ScannerContext can stop working.
+ void set_should_stop();
+
// Return true if this ScannerContext need no more process
- bool done() const { return _is_finished || _should_stop; }
+ virtual bool done() { return _is_finished || _should_stop; }
bool is_finished() { return _is_finished.load(); }
bool should_stop() { return _should_stop.load(); }
bool status_error() { return _status_error.load(); }
void inc_num_running_scanners(int32_t scanner_inc);
- void dec_num_running_scanners(int32_t scanner_dec);
+ void set_ready_to_finish();
int get_num_running_scanners() const { return _num_running_scanners; }
int get_num_unfinished_scanners() const { return _num_unfinished_scanners;
}
- void get_next_batch_of_scanners(std::list<std::weak_ptr<ScannerDelegate>>*
current_run);
+ void dec_num_scheduling_ctx();
+
+ int get_num_scheduling_ctx() const { return _num_scheduling_ctx; }
+
+ void get_next_batch_of_scanners(std::list<VScannerSPtr>* current_run);
+
+ template <typename Parent>
+ void clear_and_join(Parent* parent, RuntimeState* state);
+
+ bool no_schedule();
virtual std::string debug_string();
@@ -126,6 +138,7 @@ public:
void incr_num_ctx_scheduling(int64_t num) {
_scanner_ctx_sched_counter->update(num); }
void incr_ctx_scheduling_time(int64_t num) {
_scanner_ctx_sched_time->update(num); }
+ void incr_num_scanner_scheduling(int64_t num) {
_scanner_sched_counter->update(num); }
std::string parent_name();
@@ -133,7 +146,7 @@ public:
// todo(wb) rethinking how to calculate ```_max_bytes_in_queue``` when
executing shared scan
inline bool should_be_scheduled() const {
- return !done() && (_cur_bytes_in_queue < _max_bytes_in_queue / 2) &&
+ return (_cur_bytes_in_queue < _max_bytes_in_queue / 2) &&
(_serving_blocks_num < allowed_blocks_num());
}
@@ -156,8 +169,6 @@ public:
SimplifiedScanScheduler* get_simple_scan_scheduler() { return
_simple_scan_scheduler; }
- void stop_scanners(RuntimeState* state);
-
void reschedule_scanner_ctx();
// the unique id of this context
@@ -170,12 +181,17 @@ public:
std::weak_ptr<TaskExecutionContext> get_task_execution_context() { return
_task_exec_ctx; }
+private:
+ template <typename Parent>
+ Status _close_and_clear_scanners(Parent* parent, RuntimeState* state);
+
protected:
ScannerContext(RuntimeState* state_, const TupleDescriptor*
output_tuple_desc,
- const std::list<std::shared_ptr<ScannerDelegate>>&
scanners_, int64_t limit_,
+ const std::list<VScannerSPtr>& scanners_, int64_t limit_,
int64_t max_bytes_in_blocks_queue_, const int
num_parallel_instances,
pipeline::ScanLocalStateBase* local_state,
- std::shared_ptr<pipeline::ScanDependency> dependency);
+ std::shared_ptr<pipeline::ScanDependency> dependency,
+ std::shared_ptr<pipeline::Dependency> finish_dependency);
virtual void _dispose_coloate_blocks_not_in_queue() {}
void _set_scanner_done();
@@ -259,11 +275,9 @@ protected:
// and then if the scanner is not finished, will be pushed back to this
list.
// Not need to protect by lock, because only one scheduler thread will
access to it.
std::mutex _scanners_lock;
- // Scanner's ownership belong to vscannode or scanoperator, scanner
context does not own it.
- // ScannerContext has to check if scanner is deconstructed before use it.
- std::list<std::weak_ptr<ScannerDelegate>> _scanners;
+ std::list<VScannerSPtr> _scanners;
// weak pointer for _scanners, used in stop function
- std::vector<std::weak_ptr<ScannerDelegate>> _all_scanners;
+ std::vector<VScannerWPtr> _scanners_ref;
std::vector<int64_t> _finished_scanner_runtime;
std::vector<int64_t> _finished_scanner_rows_read;
std::vector<int64_t> _finished_scanner_wait_worker_time;
@@ -280,6 +294,7 @@ protected:
RuntimeProfile::Counter* _scanner_wait_batch_timer = nullptr;
std::shared_ptr<pipeline::ScanDependency> _dependency = nullptr;
+ std::shared_ptr<pipeline::Dependency> _finish_dependency = nullptr;
};
} // namespace vectorized
} // namespace doris
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index a67b9d7f27a..e8d7f8a7139 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -180,14 +180,20 @@ void
ScannerScheduler::_schedule_scanners(std::shared_ptr<ScannerContext> ctx) {
watch.reset();
watch.start();
ctx->incr_num_ctx_scheduling(1);
+ size_t size = 0;
+ Defer defer {[&]() {
+ ctx->incr_num_scanner_scheduling(size);
+ ctx->dec_num_scheduling_ctx();
+ }};
if (ctx->done()) {
return;
}
- std::list<std::weak_ptr<ScannerDelegate>> this_run;
+ std::list<VScannerSPtr> this_run;
ctx->get_next_batch_of_scanners(&this_run);
- if (this_run.empty()) {
+ size = this_run.size();
+ if (!size) {
// There will be 2 cases when this_run is empty:
// 1. The blocks queue reaches limit.
// The consumer will continue scheduling the ctx.
@@ -206,14 +212,9 @@ void
ScannerScheduler::_schedule_scanners(std::shared_ptr<ScannerContext> ctx) {
if (ctx->thread_token != nullptr) {
// TODO llj tg how to treat this?
while (iter != this_run.end()) {
- std::shared_ptr<ScannerDelegate> scanner_delegate = (*iter).lock();
- if (scanner_delegate == nullptr) {
- continue;
- }
- scanner_delegate->_scanner->start_wait_worker_timer();
- auto s = ctx->thread_token->submit_func([this, scanner_ref =
*iter, ctx]() {
- this->_scanner_scan(this, ctx, scanner_ref);
- });
+ (*iter)->start_wait_worker_timer();
+ auto s = ctx->thread_token->submit_func(
+ [this, scanner = *iter, ctx] { this->_scanner_scan(this,
ctx, scanner); });
if (s.ok()) {
this_run.erase(iter++);
} else {
@@ -223,32 +224,28 @@ void
ScannerScheduler::_schedule_scanners(std::shared_ptr<ScannerContext> ctx) {
}
} else {
while (iter != this_run.end()) {
- std::shared_ptr<ScannerDelegate> scanner_delegate = (*iter).lock();
- if (scanner_delegate == nullptr) {
- continue;
- }
- scanner_delegate->_scanner->start_wait_worker_timer();
- TabletStorageType type =
scanner_delegate->_scanner->get_storage_type();
+ (*iter)->start_wait_worker_timer();
+ TabletStorageType type = (*iter)->get_storage_type();
bool ret = false;
if (type == TabletStorageType::STORAGE_TYPE_LOCAL) {
if (auto* scan_sche = ctx->get_simple_scan_scheduler()) {
- auto work_func = [this, scanner_ref = *iter, ctx]() {
- this->_scanner_scan(this, ctx, scanner_ref);
+ auto work_func = [this, scanner = *iter, ctx] {
+ this->_scanner_scan(this, ctx, scanner);
};
SimplifiedScanTask simple_scan_task = {work_func, ctx};
ret =
scan_sche->get_scan_queue()->try_put(simple_scan_task);
} else {
PriorityThreadPool::Task task;
- task.work_function = [this, scanner_ref = *iter, ctx]() {
- this->_scanner_scan(this, ctx, scanner_ref);
+ task.work_function = [this, scanner = *iter, ctx] {
+ this->_scanner_scan(this, ctx, scanner);
};
task.priority = nice;
ret = _local_scan_thread_pool->offer(task);
}
} else {
PriorityThreadPool::Task task;
- task.work_function = [this, scanner_ref = *iter, ctx]() {
- this->_scanner_scan(this, ctx, scanner_ref);
+ task.work_function = [this, scanner = *iter, ctx] {
+ this->_scanner_scan(this, ctx, scanner);
};
task.priority = nice;
ret = _remote_scan_thread_pool->offer(task);
@@ -266,22 +263,13 @@ void
ScannerScheduler::_schedule_scanners(std::shared_ptr<ScannerContext> ctx) {
}
void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler,
- std::shared_ptr<ScannerContext> ctx,
- std::weak_ptr<ScannerDelegate>
scanner_ref) {
- Defer defer {[&]() { ctx->dec_num_running_scanners(1); }};
+ std::shared_ptr<ScannerContext> ctx,
VScannerSPtr scanner) {
auto task_lock = ctx->get_task_execution_context().lock();
if (task_lock == nullptr) {
// LOG(WARNING) << "could not lock task execution context, query " <<
print_id(_query_id)
// << " maybe finished";
return;
}
- // will release scanner if it is the last one, task lock is hold here, to
ensure
- // that scanner could call scannode's method during deconstructor
- std::shared_ptr<ScannerDelegate> scanner_delegate = scanner_ref.lock();
- auto& scanner = scanner_delegate->_scanner;
- if (scanner_delegate == nullptr) {
- return;
- }
SCOPED_ATTACH_TASK(scanner->runtime_state());
// for cpu hard limit, thread name should not be reset
if (ctx->_should_reset_thread_name) {
@@ -412,7 +400,7 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler*
scheduler,
if (eos || should_stop) {
scanner->mark_to_need_to_close();
}
- ctx->push_back_scanner_and_reschedule(scanner_delegate);
+ ctx->push_back_scanner_and_reschedule(scanner);
}
void ScannerScheduler::_register_metrics() {
diff --git a/be/src/vec/exec/scan/scanner_scheduler.h
b/be/src/vec/exec/scan/scanner_scheduler.h
index 9fedd27dbd8..eb4d1380e39 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.h
+++ b/be/src/vec/exec/scan/scanner_scheduler.h
@@ -36,7 +36,7 @@ class BlockingQueue;
} // namespace doris
namespace doris::vectorized {
-class ScannerDelegate;
+
class ScannerContext;
// Responsible for the scheduling and execution of all Scanners of a BE node.
@@ -79,7 +79,7 @@ private:
void _schedule_scanners(std::shared_ptr<ScannerContext> ctx);
// execution thread function
void _scanner_scan(ScannerScheduler* scheduler,
std::shared_ptr<ScannerContext> ctx,
- std::weak_ptr<ScannerDelegate> scanner);
+ VScannerSPtr scanner);
void _register_metrics();
diff --git a/be/src/vec/exec/scan/vscan_node.cpp
b/be/src/vec/exec/scan/vscan_node.cpp
index b780fc1a8a9..5176d7900b3 100644
--- a/be/src/vec/exec/scan/vscan_node.cpp
+++ b/be/src/vec/exec/scan/vscan_node.cpp
@@ -273,7 +273,7 @@ Status VScanNode::get_next(RuntimeState* state,
vectorized::Block* block, bool*
reached_limit(block, eos);
if (*eos) {
// reach limit, stop the scanners.
- _scanner_ctx->stop_scanners(state);
+ _scanner_ctx->set_should_stop();
}
return Status::OK();
@@ -318,8 +318,8 @@ Status VScanNode::_init_profile() {
return Status::OK();
}
-void VScanNode::_start_scanners(const
std::list<std::shared_ptr<ScannerDelegate>>& scanners,
- const int query_parallel_instance_num) {
+Status VScanNode::_start_scanners(const std::list<VScannerSPtr>& scanners,
+ const int query_parallel_instance_num) {
if (_is_pipeline_scan) {
int max_queue_size = _shared_scan_opt ?
std::max(query_parallel_instance_num, 1) : 1;
_scanner_ctx = pipeline::PipScannerContext::create_shared(
@@ -329,29 +329,41 @@ void VScanNode::_start_scanners(const
std::list<std::shared_ptr<ScannerDelegate>
_scanner_ctx = ScannerContext::create_shared(_state, this,
_output_tuple_desc, scanners,
limit(),
_state->scan_queue_mem_limit());
}
+ return Status::OK();
}
Status VScanNode::close(RuntimeState* state) {
if (is_closed()) {
return Status::OK();
}
-
RETURN_IF_ERROR(ExecNode::close(state));
return Status::OK();
}
void VScanNode::release_resource(RuntimeState* state) {
if (_scanner_ctx) {
- if (!state->enable_pipeline_exec() || _should_create_scanner) {
+ if (!state->enable_pipeline_exec()) {
// stop and wait the scanner scheduler to be done
// _scanner_ctx may not be created for some short circuit case.
- _scanner_ctx->stop_scanners(state);
+ _scanner_ctx->set_should_stop();
+ _scanner_ctx->clear_and_join(this, state);
+ } else if (_should_create_scanner) {
+ _scanner_ctx->clear_and_join(this, state);
}
}
- _scanners.clear();
+
ExecNode::release_resource(state);
}
+Status VScanNode::try_close(RuntimeState* state) {
+ if (_scanner_ctx) {
+ // mark this scanner ctx as should_stop to make sure scanners will not
be scheduled anymore
+ // TODO: there is a lock in `set_should_stop` may cause some slight
impact
+ _scanner_ctx->set_should_stop();
+ }
+ return Status::OK();
+}
+
Status VScanNode::_normalize_conjuncts() {
// The conjuncts is always on output tuple, so use _output_tuple_desc;
std::vector<SlotDescriptor*> slots = _output_tuple_desc->slots();
@@ -1317,15 +1329,11 @@ VScanNode::PushDownType
VScanNode::_should_push_down_in_predicate(VInPredicate*
Status VScanNode::_prepare_scanners(const int query_parallel_instance_num) {
std::list<VScannerSPtr> scanners;
RETURN_IF_ERROR(_init_scanners(&scanners));
- // Init scanner wrapper
- for (auto it = scanners.begin(); it != scanners.end(); ++it) {
- _scanners.emplace_back(std::make_shared<ScannerDelegate>(*it));
- }
if (scanners.empty()) {
_eos = true;
} else {
COUNTER_SET(_num_scanners, static_cast<int64_t>(scanners.size()));
- _start_scanners(_scanners, query_parallel_instance_num);
+ RETURN_IF_ERROR(_start_scanners(scanners,
query_parallel_instance_num));
}
return Status::OK();
}
diff --git a/be/src/vec/exec/scan/vscan_node.h
b/be/src/vec/exec/scan/vscan_node.h
index d4a054cacd5..5917d0ff46b 100644
--- a/be/src/vec/exec/scan/vscan_node.h
+++ b/be/src/vec/exec/scan/vscan_node.h
@@ -87,16 +87,6 @@ struct FilterPredicates {
std::vector<std::pair<std::string, std::shared_ptr<HybridSetBase>>>
in_filters;
};
-// We want to close scanner automatically, so using a delegate class
-// and call close method in the delegate class's dctor.
-class ScannerDelegate {
-public:
- VScannerSPtr _scanner;
- ScannerDelegate(VScannerSPtr& scanner_ptr) : _scanner(scanner_ptr) {}
- ~ScannerDelegate() {
static_cast<void>(_scanner->close(_scanner->runtime_state())); }
- ScannerDelegate(ScannerDelegate&&) = delete;
-};
-
class VScanNode : public ExecNode, public RuntimeFilterConsumer {
public:
VScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl&
descs)
@@ -166,6 +156,8 @@ public:
Status alloc_resource(RuntimeState* state) override;
void release_resource(RuntimeState* state) override;
+ Status try_close(RuntimeState* state);
+
bool should_run_serial() const {
return _should_run_serial || _state->enable_scan_node_run_serial();
}
@@ -270,11 +262,8 @@ protected:
int _max_scan_key_num;
int _max_pushdown_conditions_per_column;
- // ScanNode owns the ownership of scanner, scanner context only has its
weakptr
- std::list<std::shared_ptr<ScannerDelegate>> _scanners;
-
- // Each scan node will generates a ScannerContext to do schedule work
- // ScannerContext will be added to scanner scheduler
+ // Each scan node will generates a ScannerContext to manage all Scanners.
+ // See comments of ScannerContext for more details
std::shared_ptr<ScannerContext> _scanner_ctx = nullptr;
// indicate this scan node has no more data to return
@@ -448,8 +437,8 @@ private:
const std::string& fn_name, int
slot_ref_child = -1);
// Submit the scanner to the thread pool and start execution
- void _start_scanners(const std::list<std::shared_ptr<ScannerDelegate>>&
scanners,
- const int query_parallel_instance_num);
+ Status _start_scanners(const std::list<VScannerSPtr>& scanners,
+ const int query_parallel_instance_num);
};
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h
index 6046d87ac91..29daf9a68c5 100644
--- a/be/src/vec/exec/scan/vscanner.h
+++ b/be/src/vec/exec/scan/vscanner.h
@@ -145,6 +145,16 @@ public:
void set_status_on_failure(const Status& st) { _status = st; }
+ // return false if _is_counted_down is already true,
+ // otherwise, set _is_counted_down to true and return true.
+ bool set_counted_down() {
+ if (_is_counted_down) {
+ return false;
+ }
+ _is_counted_down = true;
+ return true;
+ }
+
protected:
void _discard_conjuncts() {
for (auto& conjunct : _conjuncts) {
@@ -205,6 +215,8 @@ protected:
int64_t _scan_cpu_timer = 0;
bool _is_load = false;
+ // set to true after decrease the "_num_unfinished_scanners" in scanner
context
+ bool _is_counted_down = false;
bool _is_init = true;
@@ -215,5 +227,6 @@ protected:
};
using VScannerSPtr = std::shared_ptr<VScanner>;
+using VScannerWPtr = std::weak_ptr<VScanner>;
} // namespace doris::vectorized
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]