This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/spill_and_reserve by this push:
new 3ba5406e00a [fix] Use separage memory sufficient depenedency for each
PipelineTask (#42198)
3ba5406e00a is described below
commit 3ba5406e00a353e342d936f5d3b077531bd515fd
Author: Jerry Hu <[email protected]>
AuthorDate: Mon Oct 21 17:41:26 2024 +0800
[fix] Use separage memory sufficient depenedency for each PipelineTask
(#42198)
## Proposed changes
1. [opt] Limit the number of scanners in FileScanOperator
2. [fix] avoid finishing spilling streams repeatly in partitioned join
3. [fix] Query blocking issue caused by pending block in PipelineTask
---
be/src/pipeline/exec/file_scan_operator.cpp | 15 ++++-
.../exec/partitioned_hash_join_sink_operator.cpp | 5 ++
.../exec/partitioned_hash_join_sink_operator.h | 1 +
be/src/pipeline/pipeline_fragment_context.cpp | 13 ++++
be/src/pipeline/pipeline_fragment_context.h | 2 +
be/src/pipeline/pipeline_task.cpp | 70 ++++++++++------------
be/src/pipeline/pipeline_task.h | 6 +-
be/src/runtime/query_context.cpp | 21 +++----
be/src/runtime/query_context.h | 5 --
.../workload_group/workload_group_manager.cpp | 3 +-
10 files changed, 82 insertions(+), 59 deletions(-)
diff --git a/be/src/pipeline/exec/file_scan_operator.cpp
b/be/src/pipeline/exec/file_scan_operator.cpp
index 6fa7401e278..7018c279d35 100644
--- a/be/src/pipeline/exec/file_scan_operator.cpp
+++ b/be/src/pipeline/exec/file_scan_operator.cpp
@@ -60,9 +60,20 @@ std::string FileScanLocalState::name_suffix() const {
void FileScanLocalState::set_scan_ranges(RuntimeState* state,
const std::vector<TScanRangeParams>&
scan_ranges) {
+ auto wg_ptr = state->get_query_ctx()->workload_group();
_max_scanners =
config::doris_scanner_thread_pool_thread_num /
state->query_parallel_instance_num();
- _max_scanners = std::max(std::max(_max_scanners,
state->parallel_scan_max_scanners_count()), 1);
+ if (wg_ptr && state->get_query_ctx()->enable_query_slot_hard_limit()) {
+ const auto total_slots = wg_ptr->total_query_slot_count();
+ const auto query_slots = state->get_query_ctx()->get_slot_count();
+ _max_scanners = _max_scanners * query_slots / total_slots;
+ }
+
+ const auto parallel_scan_max_scanners_count =
state->parallel_scan_max_scanners_count();
+ if (parallel_scan_max_scanners_count > 0) {
+ _max_scanners =
+ std::max(std::min(_max_scanners,
state->parallel_scan_max_scanners_count()), 1);
+ }
// For select * from table limit 10; should just use one thread.
if (should_run_serial()) {
_max_scanners = 1;
@@ -82,7 +93,7 @@ void FileScanLocalState::set_scan_ranges(RuntimeState* state,
std::make_shared<vectorized::LocalSplitSourceConnector>(scan_ranges,
_max_scanners);
}
_max_scanners = std::min(_max_scanners, _split_source->num_scan_ranges());
- if (scan_ranges.size() > 0 &&
+ if (!scan_ranges.empty() &&
scan_ranges[0].scan_range.ext_scan_range.file_scan_range.__isset.params) {
// for compatibility.
// in new implement, the tuple id is set in prepare phase
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
index d3d010e0d7c..baa99d3fe14 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
@@ -412,6 +412,11 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(
}
Status PartitionedHashJoinSinkLocalState::_finish_spilling() {
+ bool expected = false;
+ if (!_spilling_finished.compare_exchange_strong(expected, true)) {
+ return Status::OK();
+ }
+
for (auto& stream : _shared_state->spilled_streams) {
if (stream) {
RETURN_IF_ERROR(stream->spill_eof());
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
index 8a844e69963..97c40d43a66 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
@@ -67,6 +67,7 @@ protected:
friend class PartitionedHashJoinSinkOperatorX;
+ std::atomic<bool> _spilling_finished {false};
vectorized::Block _pending_block;
bool _child_eos {false};
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index 6e6c2bd3e73..03db7e674f1 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -1872,6 +1872,19 @@ std::vector<PipelineTask*>
PipelineFragmentContext::get_revocable_tasks() const
return revocable_tasks;
}
+void PipelineFragmentContext::set_memory_sufficient(bool sufficient) {
+ for (const auto& task_instances : _tasks) {
+ for (const auto& task : task_instances) {
+ auto* dependency = task->get_memory_sufficient_dependency();
+ if (sufficient) {
+ dependency->set_ready();
+ } else {
+ dependency->block();
+ }
+ }
+ }
+}
+
std::string PipelineFragmentContext::debug_string() {
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer, "PipelineFragmentContext Info:\n");
diff --git a/be/src/pipeline/pipeline_fragment_context.h
b/be/src/pipeline/pipeline_fragment_context.h
index 9c2ed36b919..8d55d0ce285 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -121,6 +121,8 @@ public:
[[nodiscard]] std::vector<PipelineTask*> get_revocable_tasks() const;
+ void set_memory_sufficient(bool sufficient);
+
void instance_ids(std::vector<TUniqueId>& ins_ids) const {
ins_ids.resize(_fragment_instance_ids.size());
for (size_t i = 0; i < _fragment_instance_ids.size(); i++) {
diff --git a/be/src/pipeline/pipeline_task.cpp
b/be/src/pipeline/pipeline_task.cpp
index b3282810d86..affb8c44382 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -17,6 +17,7 @@
#include "pipeline_task.h"
+#include <fmt/core.h>
#include <fmt/format.h>
#include <gen_cpp/Metrics_types.h>
#include <glog/logging.h>
@@ -71,15 +72,18 @@ PipelineTask::PipelineTask(
_sink(pipeline->sink_shared_pointer()),
_le_state_map(std::move(le_state_map)),
_task_idx(task_idx),
- _execution_dep(state->get_query_ctx()->get_execution_dependency()),
- _memory_sufficient_dependency(
- state->get_query_ctx()->get_memory_sufficient_dependency()) {
+ _execution_dep(state->get_query_ctx()->get_execution_dependency()) {
_pipeline_task_watcher.start();
auto shared_state = _sink->create_shared_state();
if (shared_state) {
_sink_shared_state = shared_state;
}
+
+ const auto dependency_name =
+ fmt::format("MemorySufficientDependency_{}_{}", _sink->node_id(),
task_id);
+ _memory_sufficient_dependency =
+ pipeline::Dependency::create_unique(-1, -1, dependency_name, true);
}
Status PipelineTask::prepare(const TPipelineInstanceParams& local_params,
const TDataSink& tsink,
@@ -317,6 +321,12 @@ Status PipelineTask::execute(bool* eos) {
SCOPED_ATTACH_TASK(_state);
_eos = _sink->is_finished(_state) || _eos || _wake_up_by_downstream;
*eos = _eos;
+
+ // If `_wake_up_by_downstream` is true, the pending block will not be sank.
+ if (_wake_up_by_downstream) {
+ _pending_block.reset();
+ }
+
if (_eos && !_pending_block) {
// If task is waken up by finish dependency, `_eos` is set to true by
last execution, and we should return here.
return Status::OK();
@@ -388,26 +398,22 @@ Status PipelineTask::execute(bool* eos) {
// Every loop should check if memory is not enough.
// _state->get_query_ctx()->update_low_memory_mode();
- // `_dry_run` means sink operator need no more data
- // `_sink->is_finished(_state)` means sink operator should be finished
- int64_t reserve_size = 0;
- bool has_enough_memory = true;
- if (_dry_run || _sink->is_finished(_state)) {
- *eos = true;
- _eos = true;
- } else if (_pending_block) [[unlikely]] {
+ if (_pending_block) [[unlikely]] {
LOG(INFO) << "query: " << print_id(query_id)
<< " has pending block, size: " <<
_pending_block->allocated_bytes();
_block = std::move(_pending_block);
block = _block.get();
+ }
+ // `_dry_run` means sink operator need no more data
+ // `_sink->is_finished(_state)` means sink operator should be finished
+ else if (_dry_run || _sink->is_finished(_state)) {
+ *eos = true;
+ _eos = true;
} else {
SCOPED_TIMER(_get_block_timer);
DEFER_RELEASE_RESERVED();
_get_block_counter->update(1);
- // size_t sink_reserve_size = _sink->get_reserve_mem_size(_state);
- // sink_reserve_size =
- // std::max(sink_reserve_size,
_state->minimum_operator_memory_required_bytes());
- reserve_size = _root->get_reserve_mem_size(_state);
+ const auto reserve_size = _root->get_reserve_mem_size(_state);
_root->reset_reserve_mem_size(_state);
auto workload_group = _state->get_query_ctx()->workload_group();
@@ -426,19 +432,14 @@ Status PipelineTask::execute(bool* eos) {
<< ", debug info: " <<
GlobalMemoryArbitrator::process_mem_log_str();
_state->get_query_ctx()->update_paused_reason(st);
- // _state->get_query_ctx()->set_low_memory_mode();
- bool is_high_wartermark = false;
- bool is_low_wartermark = false;
- workload_group->check_mem_used(&is_low_wartermark,
&is_high_wartermark);
- if (is_low_wartermark || is_high_wartermark) {
-
ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query(
- _state->get_query_ctx()->shared_from_this(),
reserve_size);
- continue;
- }
- has_enough_memory = false;
+ _state->get_query_ctx()->set_low_memory_mode();
+
ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query(
+ _state->get_query_ctx()->shared_from_this(),
reserve_size);
+ continue;
}
}
+ DCHECK_EQ(_pending_block.get(), nullptr);
RETURN_IF_ERROR(_root->get_block_after_projects(_state, block,
eos));
}
@@ -447,9 +448,10 @@ Status PipelineTask::execute(bool* eos) {
Status status = Status::OK();
DEFER_RELEASE_RESERVED();
COUNTER_UPDATE(_memory_reserve_times, 1);
- size_t sink_reserve_size = _sink->get_reserve_mem_size(_state,
*eos);
+ const auto sink_reserve_size = _sink->get_reserve_mem_size(_state,
*eos);
status = thread_context()->try_reserve_memory(sink_reserve_size);
if (!status.ok()) {
+ COUNTER_UPDATE(_memory_reserve_failed_times, 1);
LOG(INFO) << "query: " << print_id(query_id) << ", try to
reserve: "
<< PrettyPrinter::print(sink_reserve_size,
TUnit::BYTES)
<< ", sink name: " << _sink->get_name()
@@ -457,11 +459,12 @@ Status PipelineTask::execute(bool* eos) {
<< ", failed: " << status.to_string()
<< ", debug info: " <<
GlobalMemoryArbitrator::process_mem_log_str();
_state->get_query_ctx()->update_paused_reason(status);
- _memory_sufficient_dependency->block();
+ _state->get_query_ctx()->set_low_memory_mode();
ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query(
_state->get_query_ctx()->shared_from_this(),
sink_reserve_size);
+ DCHECK_EQ(_pending_block.get(), nullptr);
_pending_block = std::move(_block);
- _block = vectorized::Block::create_unique();
+ _block =
vectorized::Block::create_unique(_pending_block->clone_empty());
_eos = *eos;
*eos = false;
continue;
@@ -484,17 +487,6 @@ Status PipelineTask::execute(bool* eos) {
return Status::OK();
}
}
-
- if (!has_enough_memory) {
- COUNTER_UPDATE(_yield_counts, 1);
-
- LOG(INFO) << "query: " << print_id(query_id) << ", task: " <<
(void*)this
- << ", insufficient memory. reserve_size: "
- << PrettyPrinter::print(reserve_size, TUnit::BYTES);
- ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query(
- _state->get_query_ctx()->shared_from_this(), reserve_size);
- break;
- }
}
static_cast<void>(get_task_queue()->push_back(this));
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 44dfdd7832a..a3505f7a407 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -244,6 +244,10 @@ public:
_spill_dependencies.emplace_back(dependency);
}
+ Dependency* get_memory_sufficient_dependency() const {
+ return _memory_sufficient_dependency.get();
+ }
+
private:
friend class RuntimeFilterDependency;
bool _is_blocked();
@@ -325,7 +329,7 @@ private:
Dependency* _execution_dep = nullptr;
std::atomic<bool> _wake_up_by_downstream = false;
- Dependency* _memory_sufficient_dependency = nullptr;
+ std::unique_ptr<Dependency> _memory_sufficient_dependency;
std::atomic<bool> _finalized {false};
std::mutex _dependency_lock;
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index 527c7ca684d..a1c89394c7b 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -92,8 +92,6 @@ QueryContext::QueryContext(TUniqueId query_id, ExecEnv*
exec_env,
_query_watcher.start();
_shared_hash_table_controller.reset(new
vectorized::SharedHashTableController());
_execution_dependency = pipeline::Dependency::create_unique(-1, -1,
"ExecutionDependency");
- _memory_sufficient_dependency =
- pipeline::Dependency::create_unique(-1, -1,
"MemorySufficientDependency", true);
_runtime_filter_mgr = std::make_unique<RuntimeFilterMgr>(
TUniqueId(), RuntimeFilterParamsContext::create(this),
query_mem_tracker);
@@ -203,7 +201,6 @@ QueryContext::~QueryContext() {
}
_runtime_filter_mgr.reset();
_execution_dependency.reset();
- _memory_sufficient_dependency.reset();
_shared_hash_table_controller.reset();
_runtime_predicates.clear();
file_scan_range_params_map.clear();
@@ -239,12 +236,18 @@ void QueryContext::set_memory_sufficient(bool sufficient)
{
_paused_timer.stop();
_paused_period_secs += _paused_timer.elapsed_time() / (1000L *
1000L * 1000L);
}
- _memory_sufficient_dependency->set_ready();
} else {
- _memory_sufficient_dependency->block();
_paused_timer.start();
++_paused_count;
}
+
+ for (auto&& [fragment_id, fragment_wptr] : _fragment_id_to_pipeline_ctx) {
+ auto fragment_ctx = fragment_wptr.lock();
+ if (!fragment_ctx) {
+ continue;
+ }
+ fragment_ctx->set_memory_sufficient(sufficient);
+ }
}
void QueryContext::cancel(Status new_status, int fragment_id) {
@@ -530,15 +533,13 @@ std::vector<pipeline::PipelineTask*>
QueryContext::get_revocable_tasks() const {
std::string QueryContext::debug_string() {
std::lock_guard l(_paused_mutex);
return fmt::format(
- "QueryId={}, Memory [Used={}, Limit={}, Peak={}], "
- "Spill[RunningSpillTaskCnt={}, TotalPausedPeriodSecs={}, "
- "MemorySufficient={}, LatestPausedReason={}]",
+ "QueryId={}, Memory [Used={}, Limit={}, Peak={}],
Spill[RunningSpillTaskCnt={}, "
+ "TotalPausedPeriodSecs={}, LatestPausedReason={}]",
print_id(_query_id),
PrettyPrinter::print(query_mem_tracker->consumption(),
TUnit::BYTES),
PrettyPrinter::print(query_mem_tracker->limit(), TUnit::BYTES),
PrettyPrinter::print(query_mem_tracker->peak_consumption(),
TUnit::BYTES),
- _revoking_tasks_count, _paused_period_secs,
_memory_sufficient_dependency->ready(),
- _paused_reason.to_string());
+ _revoking_tasks_count, _paused_period_secs,
_paused_reason.to_string());
}
std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>>
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index 9dd75cb340d..f16bd0fcf95 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -184,10 +184,6 @@ public:
pipeline::Dependency* get_execution_dependency() { return
_execution_dependency.get(); }
- pipeline::Dependency* get_memory_sufficient_dependency() {
- return _memory_sufficient_dependency.get();
- }
-
std::vector<pipeline::PipelineTask*> get_revocable_tasks() const;
Status revoke_memory();
@@ -402,7 +398,6 @@ private:
vectorized::SimplifiedScanScheduler* _remote_scan_task_scheduler = nullptr;
std::unique_ptr<pipeline::Dependency> _execution_dependency;
- std::unique_ptr<pipeline::Dependency> _memory_sufficient_dependency;
std::vector<std::weak_ptr<pipeline::PipelineTask>> _pipeline_tasks;
std::shared_ptr<QueryStatistics> _cpu_statistics = nullptr;
diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp
b/be/src/runtime/workload_group/workload_group_manager.cpp
index c2c51a35429..b42aeeb1b43 100644
--- a/be/src/runtime/workload_group/workload_group_manager.cpp
+++ b/be/src/runtime/workload_group/workload_group_manager.cpp
@@ -692,8 +692,7 @@ void
WorkloadGroupMgr::update_queries_limit(WorkloadGroupPtr wg, bool enable_har
query_ctx->set_expected_mem_limit(expected_query_weighted_mem_limit);
}
}
- LOG(INFO) << debug_msg;
- //LOG_EVERY_T(INFO, 60) << debug_msg;
+ LOG_EVERY_T(INFO, 60) << debug_msg;
}
void WorkloadGroupMgr::stop() {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]