This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/spill_and_reserve by this push:
new d46495ebf72 Add spill metrics and improve spill log printing (#46029)
d46495ebf72 is described below
commit d46495ebf72557fd3cecdffb1bd9870131acd60c
Author: TengJianPing <[email protected]>
AuthorDate: Fri Dec 27 14:41:31 2024 +0800
Add spill metrics and improve spill log printing (#46029)
---
be/src/olap/memtable_flush_executor.cpp | 3 +-
.../exec/partitioned_aggregation_sink_operator.cpp | 25 +++---
.../partitioned_aggregation_source_operator.cpp | 32 +++----
.../exec/partitioned_hash_join_probe_operator.cpp | 98 +++++++++++-----------
.../exec/partitioned_hash_join_sink_operator.cpp | 50 +++++------
be/src/pipeline/exec/spill_sort_sink_operator.cpp | 15 ++--
.../pipeline/exec/spill_sort_source_operator.cpp | 23 ++---
be/src/vec/exec/scan/scanner_scheduler.cpp | 8 +-
be/src/vec/spill/spill_reader.cpp | 11 ++-
be/src/vec/spill/spill_stream_manager.cpp | 21 +++++
be/src/vec/spill/spill_stream_manager.h | 22 +++++
be/src/vec/spill/spill_writer.cpp | 2 +
12 files changed, 186 insertions(+), 124 deletions(-)
diff --git a/be/src/olap/memtable_flush_executor.cpp
b/be/src/olap/memtable_flush_executor.cpp
index 5533a360fac..9648c3fe098 100644
--- a/be/src/olap/memtable_flush_executor.cpp
+++ b/be/src/olap/memtable_flush_executor.cpp
@@ -161,7 +161,8 @@ Status FlushToken::_try_reserve_memory(QueryThreadContext
query_thread_context,
if (memtable_flush_executor->check_and_inc_has_any_flushing_task()) {
// If there are already any flushing task, Wait for some time and
retry.
LOG_EVERY_T(INFO, 60) << fmt::format(
- "Failed to reserve memory {} for flush memtable, retry
after 100ms", size);
+ "Failed to reserve memory {} for flush memtable, retry
after 100ms",
+ PrettyPrinter::print_bytes(size));
std::this_thread::sleep_for(std::chrono::milliseconds(100));
} else {
st = Status::OK();
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
index 8cc6ae58a4f..ad7f4e6f184 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
@@ -186,9 +186,9 @@ Status
PartitionedAggSinkOperatorX::sink(doris::RuntimeState* state, vectorized:
revocable_size = revocable_mem_size(state);
query_mem_limit = state->get_query_ctx()->get_mem_limit();
LOG(INFO) << fmt::format(
- "Query: {}, task {}, agg sink {} eos, need spill: {}, query
mem limit: {}, "
- "revocable memory: {}",
- print_id(state->query_id()), state->task_id(), node_id(),
+ "Query:{}, agg sink:{}, task:{}, eos, need spill:{}, query mem
limit:{}, "
+ "revocable memory:{}",
+ print_id(state->query_id()), node_id(), state->task_id(),
local_state._shared_state->is_spilled,
PrettyPrinter::print_bytes(query_mem_limit),
PrettyPrinter::print_bytes(revocable_size));
@@ -268,9 +268,9 @@ Status PartitionedAggSinkLocalState::revoke_memory(
RuntimeState* state, const std::shared_ptr<SpillContext>&
spill_context) {
const auto size_to_revoke = _parent->revocable_mem_size(state);
LOG(INFO) << fmt::format(
- "Query: {}, task {}, agg sink {} revoke_memory, eos: {}, need
spill: {}, revocable "
- "memory: {}",
- print_id(state->query_id()), state->task_id(), _parent->node_id(),
_eos,
+ "Query:{}, agg sink:{}, task:{}, revoke_memory, eos:{}, need
spill:{}, revocable "
+ "memory:{}",
+ print_id(state->query_id()), _parent->node_id(), state->task_id(),
_eos,
_shared_state->is_spilled,
PrettyPrinter::print_bytes(_parent->revocable_mem_size(state)));
if (!_shared_state->is_spilled) {
@@ -316,16 +316,17 @@ Status PartitionedAggSinkLocalState::revoke_memory(
Defer defer {[&]() {
if (!status.ok() || state->is_cancelled()) {
if (!status.ok()) {
- LOG(WARNING) << "Query " << print_id(query_id) <<
" agg node "
- << Base::_parent->node_id()
- << " revoke_memory error: " << status;
+ LOG(WARNING) << fmt::format(
+ "Query:{}, agg sink:{}, task:{},
revoke_memory error:{}",
+ print_id(query_id),
Base::_parent->node_id(), state->task_id(),
+ status);
}
_shared_state->close();
} else {
LOG(INFO) << fmt::format(
- "Query: {}, task {}, agg sink {} revoke_memory
finish, eos: {}, "
- "revocable memory: {}",
- print_id(state->query_id()), state->task_id(),
_parent->node_id(),
+ "Query:{}, agg sink:{}, task:{}, revoke_memory
finish, eos:{}, "
+ "revocable memory:{}",
+ print_id(state->query_id()),
_parent->node_id(), state->task_id(),
_eos,
PrettyPrinter::print_bytes(_parent->revocable_mem_size(state)));
}
diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
index 8e221a1c7e2..c87ee24dedb 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
@@ -250,8 +250,9 @@ Status
PartitionedAggLocalState::recover_blocks_from_disk(RuntimeState* state, b
Defer defer {[&]() {
if (!status.ok() || state->is_cancelled()) {
if (!status.ok()) {
- LOG(WARNING) << "Query " << print_id(query_id) << " agg
node "
- << _parent->node_id() << " recover agg data
error: " << status;
+ LOG(WARNING) << fmt::format(
+ "Query:{}, agg probe:{}, task:{}, recover agg data
error:{}",
+ print_id(query_id), _parent->node_id(),
state->task_id(), status);
}
_shared_state->close();
}
@@ -305,15 +306,16 @@ Status
PartitionedAggLocalState::recover_blocks_from_disk(RuntimeState* state, b
}
}
- VLOG_DEBUG << "Query: " << print_id(query_id) << " agg node " <<
_parent->node_id()
- << ", task id: " << state->task_id() << " recover
partitioned finished, "
- << _shared_state->spill_partitions.size() << " partitions
left, "
- << accumulated_blocks_size
- << " bytes read, spill dep: " <<
(void*)(_spill_dependency.get());
+ VLOG_DEBUG << fmt::format(
+ "Query:{}, agg probe:{}, task:{}, recover partitioned
finished, partitions "
+ "left:{}, bytes read:{}, spill dep:{}",
+ print_id(query_id), _parent->node_id(), state->task_id(),
+ _shared_state->spill_partitions.size(),
accumulated_blocks_size,
+ (void*)(_spill_dependency.get()));
return status;
};
- auto exception_catch_func = [spill_func, query_id]() {
+ auto exception_catch_func = [this, state, spill_func, query_id]() {
DBUG_EXECUTE_IF("fault_inject::partitioned_agg_source::merge_spill_data_cancel",
{
auto st = Status::InternalError(
"fault_inject partitioned_agg_source "
@@ -323,8 +325,9 @@ Status
PartitionedAggLocalState::recover_blocks_from_disk(RuntimeState* state, b
});
auto status = [&]() { RETURN_IF_CATCH_EXCEPTION({ return spill_func();
}); }();
- LOG_IF(INFO, !status.ok()) << "Query : " << print_id(query_id)
- << " recover exception : " <<
status.to_string();
+ LOG_IF(INFO, !status.ok()) << fmt::format(
+ "Query:{}, agg probe:{}, task:{}, recover exception:{}",
print_id(query_id),
+ _parent->node_id(), state->task_id(), status.to_string());
return status;
};
@@ -334,10 +337,11 @@ Status
PartitionedAggLocalState::recover_blocks_from_disk(RuntimeState* state, b
});
_spill_dependency->block();
- VLOG_DEBUG << "Query: " << print_id(query_id) << " agg node " <<
_parent->node_id()
- << ", task id: " << state->task_id() << " begin to recover, "
- << _shared_state->spill_partitions.size()
- << " partitions left, _spill_dependency: " <<
(void*)(_spill_dependency.get());
+ VLOG_DEBUG << fmt::format(
+ "Query:{}, agg probe:{}, task:{}, begin to recover, partitions
left:{}, "
+ "_spill_dependency:{}",
+ print_id(query_id), _parent->node_id(), state->task_id(),
+ _shared_state->spill_partitions.size(),
(void*)(_spill_dependency.get()));
return
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit(
std::make_shared<SpillRecoverRunnable>(state, _spill_dependency,
_runtime_profile.get(),
_shared_state->shared_from_this(),
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
index 04b83e822c1..ff9c78c5be4 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
@@ -230,9 +230,11 @@ Status
PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat
}
COUNTER_SET(_probe_blocks_bytes, int64_t(not_revoked_size));
- VLOG_DEBUG << "Query: " << print_id(query_id)
- << " hash probe revoke done, node: " << p.node_id()
- << ", task: " << state->task_id();
+
+ VLOG_DEBUG << fmt::format(
+ "Query:{}, hash join probe:{}, task:{},"
+ " spill_probe_blocks done",
+ print_id(query_id), p.node_id(), state->task_id());
return Status::OK();
};
@@ -275,9 +277,10 @@ Status
PartitionedHashJoinProbeLocalState::finish_spilling(uint32_t partition_in
Status
PartitionedHashJoinProbeLocalState::recover_build_blocks_from_disk(RuntimeState*
state,
uint32_t partition_index,
bool& has_data) {
- VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", node: " <<
_parent->node_id()
- << ", task id: " << state->task_id() << ", partition: " <<
partition_index
- << " recover_build_blocks_from_disk";
+ VLOG_DEBUG << fmt::format(
+ "Query:{}, hash join probe:{}, task:{},"
+ " partition:{}, recover_build_blocks_from_disk",
+ print_id(state->query_id()), _parent->node_id(), state->task_id(),
partition_index);
auto& spilled_stream = _shared_state->spilled_streams[partition_index];
has_data = false;
if (!spilled_stream) {
@@ -291,9 +294,10 @@ Status
PartitionedHashJoinProbeLocalState::recover_build_blocks_from_disk(Runtim
SCOPED_TIMER(_recovery_build_timer);
bool eos = false;
- VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", node: "
<< _parent->node_id()
- << ", task id: " << state->task_id() << ", partition: " <<
partition_index
- << ", recoverying build data";
+ VLOG_DEBUG << fmt::format(
+ "Query:{}, hash join probe:{}, task:{},"
+ " partition:{}, recoverying build data",
+ print_id(state->query_id()), _parent->node_id(),
state->task_id(), partition_index);
Status status;
while (!eos) {
vectorized::Block block;
@@ -315,7 +319,11 @@ Status
PartitionedHashJoinProbeLocalState::recover_build_blocks_from_disk(Runtim
}
if (UNLIKELY(state->is_cancelled())) {
- LOG(INFO) << "recovery build block when canceled.";
+ LOG(INFO) << fmt::format(
+ "Query:{}, hash join probe:{}, task:{},"
+ " partition:{}, recovery build data canceled",
+ print_id(state->query_id()), _parent->node_id(),
state->task_id(),
+ partition_index);
break;
}
@@ -338,9 +346,11 @@ Status
PartitionedHashJoinProbeLocalState::recover_build_blocks_from_disk(Runtim
if (eos) {
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(spilled_stream);
_shared_state->spilled_streams[partition_index].reset();
- VLOG_DEBUG << "Query: " << print_id(state->query_id())
- << ", node: " << _parent->node_id() << ", task id: " <<
state->task_id()
- << ", partition: " << partition_index;
+ VLOG_DEBUG << fmt::format(
+ "Query:{}, hash join probe:{}, task:{},"
+ " partition:{}, recovery build data eos",
+ print_id(state->query_id()), _parent->node_id(),
state->task_id(),
+ partition_index);
}
return status;
};
@@ -365,16 +375,6 @@ Status
PartitionedHashJoinProbeLocalState::recover_build_blocks_from_disk(Runtim
auto* spill_io_pool =
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool();
has_data = true;
_spill_dependency->block();
- {
- auto* pipeline_task = state->get_task();
- if (pipeline_task) {
- auto& p = _parent->cast<PartitionedHashJoinProbeOperatorX>();
- VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", node:
" << p.node_id()
- << ", task id: " << state->task_id() << ", partition: "
<< partition_index
- << ", dependency: " << _dependency
- << ", task debug_string: " <<
pipeline_task->debug_string();
- }
- }
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::recovery_build_blocks_submit_func",
{
@@ -386,9 +386,6 @@ Status
PartitionedHashJoinProbeLocalState::recover_build_blocks_from_disk(Runtim
auto spill_runnable = std::make_shared<SpillRecoverRunnable>(
state, _spill_dependency, _runtime_profile.get(),
_shared_state->shared_from_this(),
exception_catch_func);
- VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", node: " <<
_parent->node_id()
- << ", task id: " << state->task_id() << ", partition: " <<
partition_index
- << " recover_build_blocks_from_disk submit func";
return spill_io_pool->submit(std::move(spill_runnable));
}
@@ -429,7 +426,7 @@ Status
PartitionedHashJoinProbeLocalState::recover_probe_blocks_from_disk(Runtim
auto query_id = state->query_id();
- auto read_func = [this, query_id, &spilled_stream, &blocks] {
+ auto read_func = [this, query_id, partition_index, &spilled_stream,
&blocks] {
SCOPED_TIMER(_recovery_probe_timer);
vectorized::Block block;
@@ -457,8 +454,10 @@ Status
PartitionedHashJoinProbeLocalState::recover_probe_blocks_from_disk(Runtim
}
}
if (eos) {
- VLOG_DEBUG << "Query: " << print_id(query_id)
- << ", recovery probe data done: " <<
spilled_stream->get_spill_dir();
+ VLOG_DEBUG << fmt::format(
+ "Query:{}, hash join probe:{}, task:{},"
+ " partition:{}, recovery probe data done",
+ print_id(query_id), _parent->node_id(), _state->task_id(),
partition_index);
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(spilled_stream);
spilled_stream.reset();
}
@@ -675,13 +674,13 @@ Status
PartitionedHashJoinProbeOperatorX::_setup_internal_operators(
RETURN_IF_ERROR(_inner_sink_operator->sink(local_state._shared_state->inner_runtime_state.get(),
&block, true));
- VLOG_DEBUG << "Query: " << print_id(state->query_id())
- << ", internal build operator finished, node id: " << node_id()
- << ", task id: " << state->task_id()
- << ", partition: " << local_state._partition_cursor << "rows: "
<< block.rows()
- << ", usage: "
- << _inner_sink_operator->get_memory_usage(
-
local_state._shared_state->inner_runtime_state.get());
+ VLOG_DEBUG << fmt::format(
+ "Query:{}, hash join probe:{}, task:{},"
+ " internal build operator finished, partition:{}, rows:{}, memory
usage:{}",
+ print_id(state->query_id()), node_id(), state->task_id(),
local_state._partition_cursor,
+ block.rows(),
+ _inner_sink_operator->get_memory_usage(
+ local_state._shared_state->inner_runtime_state.get()));
COUNTER_SET(local_state._hash_table_memory_usage,
sink_local_state->profile()->get_counter("MemoryUsageHashTable")->value());
@@ -734,9 +733,10 @@ Status
PartitionedHashJoinProbeOperatorX::pull(doris::RuntimeState* state,
if (!has_data) {
vectorized::Block block;
RETURN_IF_ERROR(_inner_probe_operator->push(runtime_state,
&block, true));
- VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ",
node: " << node_id()
- << ", task: " << state->task_id() << "partition: "
<< partition_index
- << " has no data to recovery";
+ VLOG_DEBUG << fmt::format(
+ "Query:{}, hash join probe:{}, task:{},"
+ " partition:{}, has no data to recovery",
+ print_id(state->query_id()), node_id(),
state->task_id(), partition_index);
break;
} else {
return Status::OK();
@@ -755,9 +755,11 @@ Status
PartitionedHashJoinProbeOperatorX::pull(doris::RuntimeState* state,
*eos = false;
if (in_mem_eos) {
- VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", node: "
<< node_id()
- << ", task: " << state->task_id()
- << ", partition: " << local_state._partition_cursor;
+ VLOG_DEBUG << fmt::format(
+ "Query:{}, hash join probe:{}, task:{},"
+ " partition:{}, probe done",
+ print_id(state->query_id()), node_id(), state->task_id(),
+ local_state._partition_cursor);
local_state._partition_cursor++;
if (local_state._partition_cursor == _partition_count) {
*eos = true;
@@ -848,8 +850,8 @@ size_t
PartitionedHashJoinProbeOperatorX::get_reserve_mem_size(RuntimeState* sta
Status PartitionedHashJoinProbeOperatorX::_revoke_memory(RuntimeState* state) {
auto& local_state = get_local_state(state);
- VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", hash probe
node: " << node_id()
- << ", task: " << state->task_id();
+ VLOG_DEBUG << fmt::format("Query:{}, hash join probe:{}, task:{},
revoke_memory",
+ print_id(state->query_id()), node_id(),
state->task_id());
RETURN_IF_ERROR(local_state.spill_probe_blocks(state));
return Status::OK();
@@ -894,10 +896,10 @@ Status
PartitionedHashJoinProbeOperatorX::get_block(RuntimeState* state, vectori
#ifndef NDEBUG
Defer eos_check_defer([&] {
if (*eos) {
- LOG(INFO) << "Query: " << print_id(state->query_id())
- << ", hash probe node: " << node_id() << ", task: " <<
state->task_id()
- << ", eos with child eos: " << local_state._child_eos
- << ", need spill: " << need_to_spill;
+ LOG(INFO) << fmt::format(
+ "Query:{}, hash join probe:{}, task:{}, child eos:{}, need
spill:{}",
+ print_id(state->query_id()), node_id(), state->task_id(),
+ local_state._child_eos, need_to_spill);
}
});
#endif
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
index 2e2c38f04c3..a227d87aa1b 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
@@ -178,8 +178,10 @@ Status
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(
}
if (build_block.rows() <= 1) {
- LOG(WARNING) << "has no data to revoke, node: " << _parent->node_id()
- << ", task: " << state->task_id();
+ LOG(WARNING) << fmt::format(
+ "Query:{}, hash join sink:{}, task:{},"
+ " has no data to revoke",
+ print_id(state->query_id()), _parent->node_id(),
state->task_id());
if (spill_context) {
spill_context->on_task_finished();
}
@@ -270,9 +272,9 @@ Status
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(
});
status = _finish_spilling();
VLOG_DEBUG << fmt::format(
- "Query: {}, task {}, hash join sink {}
_revoke_unpartitioned_block "
+ "Query:{}, hash join sink:{}, task:{},
_revoke_unpartitioned_block, "
"set_ready_to_read",
- print_id(state->query_id()), state->task_id(),
_parent->node_id());
+ print_id(state->query_id()), _parent->node_id(),
state->task_id());
_dependency->set_ready_to_read();
}
@@ -303,9 +305,9 @@ Status
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(
Status PartitionedHashJoinSinkLocalState::revoke_memory(
RuntimeState* state, const std::shared_ptr<SpillContext>&
spill_context) {
SCOPED_TIMER(_spill_total_timer);
- VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", task " <<
state->task_id()
- << " hash join sink " << _parent->node_id() << " revoke_memory"
- << ", eos: " << _child_eos;
+ VLOG_DEBUG << fmt::format("Query:{}, hash join sink:{}, task:{},
revoke_memory, eos:{}",
+ print_id(state->query_id()), _parent->node_id(),
state->task_id(),
+ _child_eos);
CHECK_EQ(_spill_dependency->is_blocked_by(nullptr), nullptr);
if (!_shared_state->need_to_spill) {
@@ -322,9 +324,9 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(
auto spill_fin_cb = [this, state, query_id, spill_context]() {
Status status;
if (_child_eos) {
- LOG(INFO) << "Query:" << print_id(this->state()->query_id()) << ",
task "
- << state->task_id() << " hash join sink " <<
_parent->node_id()
- << " finish spilling, set_ready_to_read";
+ LOG(INFO) << fmt::format(
+ "Query:{}, hash join sink:{}, task:{}, finish spilling,
set_ready_to_read",
+ print_id(this->state()->query_id()), _parent->node_id(),
state->task_id());
std::for_each(_shared_state->partitioned_build_blocks.begin(),
_shared_state->partitioned_build_blocks.end(),
[&](auto& block) {
if (block) {
@@ -565,10 +567,9 @@ Status
PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B
revocable_size = revocable_mem_size(state);
query_mem_limit = state->get_query_ctx()->get_mem_limit();
LOG(INFO) << fmt::format(
- "Query: {}, task {}, hash join sink {} eos, need spill: {},
query mem limit: {}, "
- "revocable "
- "memory: {}",
- print_id(state->query_id()), state->task_id(), node_id(),
need_to_spill,
+ "Query:{}, hash join sink:{}, task:{}, eos, need spill:{},
query mem limit:{}, "
+ "revocable memory:{}",
+ print_id(state->query_id()), node_id(), state->task_id(),
need_to_spill,
PrettyPrinter::print_bytes(query_mem_limit),
PrettyPrinter::print_bytes(revocable_size));
}
@@ -590,9 +591,9 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState*
state, vectorized::B
if (is_revocable_mem_high_watermark(state, revocable_size,
query_mem_limit)) {
LOG(INFO) << fmt::format(
- "Query: {}, task {}, hash join sink {} eos,
revoke_memory "
+ "Query:{}, hash join sink:{}, task:{} eos,
revoke_memory "
"because revocable memory is high",
- print_id(state->query_id()), state->task_id(),
node_id());
+ print_id(state->query_id()), node_id(),
state->task_id());
return revoke_memory(state, nullptr);
}
@@ -601,10 +602,9 @@ Status
PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B
local_state._shared_state->inner_runtime_state.get(),
in_block, eos));
LOG(INFO) << fmt::format(
- "Query: {}, task {}, hash join sink {} eos,
set_ready_to_read, nonspill "
- "memory "
- "usage: {}",
- print_id(state->query_id()), state->task_id(),
node_id(),
+ "Query:{}, hash join sink:{}, task:{}, eos,
set_ready_to_read, nonspill "
+ "memory usage:{}",
+ print_id(state->query_id()), node_id(),
state->task_id(),
_inner_sink_operator->get_memory_usage_debug_str(
local_state._shared_state->inner_runtime_state.get()));
}
@@ -642,9 +642,9 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState*
state, vectorized::B
if (eos) {
if (is_revocable_mem_high_watermark(state, revocable_size,
query_mem_limit)) {
LOG(INFO) << fmt::format(
- "Query: {}, task {}, hash join sink {} eos,
revoke_memory "
+ "Query:{}, hash join sink:{}, task:{}, eos,
revoke_memory "
"because revocable memory is high",
- print_id(state->query_id()), state->task_id(),
node_id());
+ print_id(state->query_id()), node_id(),
state->task_id());
return revoke_memory(state, nullptr);
}
}
@@ -653,9 +653,9 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState*
state, vectorized::B
local_state.update_memory_usage();
if (eos) {
LOG(INFO) << fmt::format(
- "Query: {}, task {}, hash join sink {} eos,
set_ready_to_read, nonspill memory "
- "usage: {}",
- print_id(state->query_id()), state->task_id(), node_id(),
+ "Query:{}, hash join sink:{}, task:{}, eos,
set_ready_to_read, nonspill memory "
+ "usage:{}",
+ print_id(state->query_id()), node_id(), state->task_id(),
_inner_sink_operator->get_memory_usage_debug_str(
local_state._shared_state->inner_runtime_state.get()));
local_state._dependency->set_ready_to_read();
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
index 03c4072f7de..debe1d59710 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
@@ -195,9 +195,9 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState*
state,
profile()->add_info_string("Spilled", "true");
}
- VLOG_DEBUG << "Query " << print_id(state->query_id()) << " sort node "
- << Base::_parent->node_id() << " revoke_memory"
- << ", eos: " << _eos;
+ VLOG_DEBUG << fmt::format("Query:{}, sort sink:{}, task:{}, revoke_memory,
eos:{}",
+ print_id(state->query_id()), _parent->node_id(),
state->task_id(),
+ _eos);
auto status =
ExecEnv::GetInstance()->spill_stream_mgr()->register_spill_stream(
state, _spilling_stream, print_id(state->query_id()), "sort",
_parent->node_id(),
@@ -219,13 +219,14 @@ Status
SpillSortSinkLocalState::revoke_memory(RuntimeState* state,
Defer defer {[&]() {
if (!status.ok() || state->is_cancelled()) {
if (!status.ok()) {
- LOG(WARNING) << "Query " << print_id(query_id) << " sort
node "
- << _parent->node_id() << " revoke memory
error: " << status;
+ LOG(WARNING) << fmt::format(
+ "Query:{}, sort sink:{}, task:{}, revoke memory
error:{}",
+ print_id(query_id), _parent->node_id(),
state->task_id(), status);
}
_shared_state->close();
} else {
- VLOG_DEBUG << "Query " << print_id(query_id) << " sort node "
<< _parent->node_id()
- << " revoke memory finish";
+ VLOG_DEBUG << fmt::format("Query:{}, sort sink:{}, task:{},
revoke memory finish",
+ print_id(query_id),
_parent->node_id(), state->task_id());
}
if (!status.ok()) {
diff --git a/be/src/pipeline/exec/spill_sort_source_operator.cpp
b/be/src/pipeline/exec/spill_sort_source_operator.cpp
index 8a58d0b1504..43bb8a65b6e 100644
--- a/be/src/pipeline/exec/spill_sort_source_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_source_operator.cpp
@@ -70,8 +70,8 @@ int
SpillSortLocalState::_calc_spill_blocks_to_merge(RuntimeState* state) const
}
Status SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState*
state) {
auto& parent = Base::_parent->template cast<Parent>();
- VLOG_DEBUG << "Query " << print_id(state->query_id()) << " sort node " <<
_parent->node_id()
- << " merge spill data";
+ VLOG_DEBUG << fmt::format("Query:{}, sort source:{}, task:{}, merge spill
data",
+ print_id(state->query_id()), _parent->node_id(),
state->task_id());
_spill_dependency->Dependency::block();
auto query_id = state->query_id();
@@ -82,8 +82,9 @@ Status
SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat
Defer defer {[&]() {
if (!status.ok() || state->is_cancelled()) {
if (!status.ok()) {
- LOG(WARNING) << "Query " << print_id(query_id) << " sort
node "
- << _parent->node_id() << " merge spill data
error: " << status;
+ LOG(WARNING) << fmt::format(
+ "Query:{}, sort source:{}, task:{}, merge spill
data error:{}",
+ print_id(query_id), _parent->node_id(),
state->task_id(), status);
}
_shared_state->close();
for (auto& stream : _current_merging_streams) {
@@ -91,18 +92,20 @@ Status
SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat
}
_current_merging_streams.clear();
} else {
- VLOG_DEBUG << "Query " << print_id(query_id) << " sort node "
<< _parent->node_id()
- << " merge spill data finish";
+ VLOG_DEBUG << fmt::format(
+ "Query:{}, sort source:{}, task:{}, merge spill data
finish",
+ print_id(query_id), _parent->node_id(),
state->task_id());
}
}};
vectorized::Block merge_sorted_block;
vectorized::SpillStreamSPtr tmp_stream;
while (!state->is_cancelled()) {
int max_stream_count = _calc_spill_blocks_to_merge(state);
- VLOG_DEBUG << "Query " << print_id(query_id) << " sort node " <<
_parent->node_id()
- << " merge spill streams, streams count: "
- << _shared_state->sorted_streams.size()
- << ", curren merge max stream count: " <<
max_stream_count;
+ VLOG_DEBUG << fmt::format(
+ "Query:{}, sort source:{}, task:{}, merge spill streams,
streams count:{}, "
+ "curren merge max stream count:{}",
+ print_id(query_id), _parent->node_id(), state->task_id(),
+ _shared_state->sorted_streams.size(), max_stream_count);
{
SCOPED_TIMER(Base::_spill_recover_time);
status = _create_intermediate_merger(
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index d8858dd5aba..e0fb08c43ba 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -381,10 +381,10 @@ void
ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
scan_task->cached_blocks.emplace_back(std::move(free_block), free_block_bytes);
}
if (scan_task->cached_blocks.back().first->rows() > 0) {
- auto block_avg_bytes =
-
(scan_task->cached_blocks.back().first->allocated_bytes() +
- scan_task->cached_blocks.back().first->rows() -
1) /
- scan_task->cached_blocks.back().first->rows() *
ctx->batch_size();
+ auto block_avg_bytes =
(scan_task->cached_blocks.back().first->bytes() +
+
scan_task->cached_blocks.back().first->rows() - 1) /
+
scan_task->cached_blocks.back().first->rows() *
+ ctx->batch_size();
scanner->update_block_avg_bytes(block_avg_bytes);
}
if (ctx->low_memory_mode()) {
diff --git a/be/src/vec/spill/spill_reader.cpp
b/be/src/vec/spill/spill_reader.cpp
index 014b83be23d..40323f824a8 100644
--- a/be/src/vec/spill/spill_reader.cpp
+++ b/be/src/vec/spill/spill_reader.cpp
@@ -29,6 +29,7 @@
#include "runtime/exec_env.h"
#include "util/slice.h"
#include "vec/core/block.h"
+#include "vec/spill/spill_stream_manager.h"
namespace doris {
#include "common/compile_check_begin.h"
namespace io {
@@ -52,11 +53,12 @@ Status SpillReader::open() {
Slice result((char*)&block_count_, sizeof(size_t));
+ size_t total_read_bytes = 0;
// read block count
size_t bytes_read = 0;
RETURN_IF_ERROR(file_reader_->read_at(file_size - sizeof(size_t), result,
&bytes_read));
DCHECK(bytes_read == 8); // max_sub_block_size, block count
- COUNTER_UPDATE(_read_file_size, bytes_read);
+ total_read_bytes += bytes_read;
if (_query_statistics) {
_query_statistics->add_spill_read_bytes_from_local_storage(bytes_read);
}
@@ -66,7 +68,7 @@ Status SpillReader::open() {
result.data = (char*)&max_sub_block_size_;
RETURN_IF_ERROR(file_reader_->read_at(file_size - sizeof(size_t) * 2,
result, &bytes_read));
DCHECK(bytes_read == 8); // max_sub_block_size, block count
- COUNTER_UPDATE(_read_file_size, bytes_read);
+ total_read_bytes += bytes_read;
if (_query_statistics) {
_query_statistics->add_spill_read_bytes_from_local_storage(bytes_read);
}
@@ -87,7 +89,9 @@ Status SpillReader::open() {
RETURN_IF_ERROR(file_reader_->read_at(read_offset, result, &bytes_read));
DCHECK(bytes_read == block_count_ * sizeof(size_t));
- COUNTER_UPDATE(_read_file_size, bytes_read);
+ total_read_bytes += bytes_read;
+ COUNTER_UPDATE(_read_file_size, total_read_bytes);
+
ExecEnv::GetInstance()->spill_stream_mgr()->update_spill_read_bytes(total_read_bytes);
if (_query_statistics) {
_query_statistics->add_spill_read_bytes_from_local_storage(bytes_read);
}
@@ -134,6 +138,7 @@ Status SpillReader::read(Block* block, bool* eos) {
if (bytes_read > 0) {
COUNTER_UPDATE(_read_file_size, bytes_read);
+
ExecEnv::GetInstance()->spill_stream_mgr()->update_spill_read_bytes(bytes_read);
if (_query_statistics) {
_query_statistics->add_spill_read_bytes_from_local_storage(bytes_read);
}
diff --git a/be/src/vec/spill/spill_stream_manager.cpp
b/be/src/vec/spill/spill_stream_manager.cpp
index 07a947b5ef3..833c5471fca 100644
--- a/be/src/vec/spill/spill_stream_manager.cpp
+++ b/be/src/vec/spill/spill_stream_manager.cpp
@@ -43,6 +43,9 @@
namespace doris::vectorized {
#include "common/compile_check_begin.h"
+SpillStreamManager::~SpillStreamManager() {
+ DorisMetrics::instance()->metric_registry()->deregister_entity(_entity);
+}
SpillStreamManager::SpillStreamManager(
std::unordered_map<std::string,
std::unique_ptr<vectorized::SpillDataDir>>&&
spill_store_map)
@@ -84,9 +87,27 @@ Status SpillStreamManager::init() {
"Spill", "spill_gc_thread", [this]() {
this->_spill_gc_thread_callback(); },
&_spill_gc_thread));
LOG(INFO) << "spill gc thread started";
+
+ _init_metrics();
+
return Status::OK();
}
+void SpillStreamManager::_init_metrics() {
+ _entity =
DorisMetrics::instance()->metric_registry()->register_entity("spill",
+
{{"name", "spill"}});
+
+ _spill_write_bytes_metric = std::make_unique<doris::MetricPrototype>(
+ doris::MetricType::COUNTER, doris::MetricUnit::BYTES,
"spill_write_bytes");
+ _spill_write_bytes_counter =
(IntAtomicCounter*)(_entity->register_metric<IntAtomicCounter>(
+ _spill_write_bytes_metric.get()));
+
+ _spill_read_bytes_metric = std::make_unique<doris::MetricPrototype>(
+ doris::MetricType::COUNTER, doris::MetricUnit::BYTES,
"spill_read_bytes");
+ _spill_read_bytes_counter =
(IntAtomicCounter*)(_entity->register_metric<IntAtomicCounter>(
+ _spill_read_bytes_metric.get()));
+}
+
// clean up stale spilled files
void SpillStreamManager::_spill_gc_thread_callback() {
while (!_stop_background_threads_latch.wait_for(
diff --git a/be/src/vec/spill/spill_stream_manager.h
b/be/src/vec/spill/spill_stream_manager.h
index 7bcfe950097..53ae89e9111 100644
--- a/be/src/vec/spill/spill_stream_manager.h
+++ b/be/src/vec/spill/spill_stream_manager.h
@@ -30,6 +30,14 @@
namespace doris {
#include "common/compile_check_begin.h"
class RuntimeProfile;
+template <typename T>
+class AtomicCounter;
+using IntAtomicCounter = AtomicCounter<int64_t>;
+template <typename T>
+class AtomicGauge;
+using UIntGauge = AtomicGauge<uint64_t>;
+class MetricEntity;
+struct MetricPrototype;
namespace vectorized {
@@ -106,6 +114,7 @@ private:
};
class SpillStreamManager {
public:
+ ~SpillStreamManager();
SpillStreamManager(std::unordered_map<std::string,
std::unique_ptr<vectorized::SpillDataDir>>&&
spill_store_map);
@@ -133,7 +142,12 @@ public:
ThreadPool* get_spill_io_thread_pool() const { return
_spill_io_thread_pool.get(); }
+ void update_spill_write_bytes(int64_t bytes) {
_spill_write_bytes_counter->increment(bytes); }
+
+ void update_spill_read_bytes(int64_t bytes) {
_spill_read_bytes_counter->increment(bytes); }
+
private:
+ void _init_metrics();
Status _init_spill_store_map();
void _spill_gc_thread_callback();
std::vector<SpillDataDir*> _get_stores_for_spill(TStorageMedium::type
storage_medium);
@@ -145,6 +159,14 @@ private:
scoped_refptr<Thread> _spill_gc_thread;
std::atomic_uint64_t id_ = 0;
+
+ std::shared_ptr<MetricEntity> _entity {nullptr};
+
+ std::unique_ptr<doris::MetricPrototype> _spill_write_bytes_metric
{nullptr};
+ std::unique_ptr<doris::MetricPrototype> _spill_read_bytes_metric {nullptr};
+
+ IntAtomicCounter* _spill_write_bytes_counter {nullptr};
+ IntAtomicCounter* _spill_read_bytes_counter {nullptr};
};
} // namespace vectorized
} // namespace doris
diff --git a/be/src/vec/spill/spill_writer.cpp
b/be/src/vec/spill/spill_writer.cpp
index 5cff9042103..3a576004091 100644
--- a/be/src/vec/spill/spill_writer.cpp
+++ b/be/src/vec/spill/spill_writer.cpp
@@ -59,6 +59,7 @@ Status SpillWriter::close() {
COUNTER_UPDATE(_write_file_current_size, meta_.size());
}
data_dir_->update_spill_data_usage(meta_.size());
+
ExecEnv::GetInstance()->spill_stream_mgr()->update_spill_write_bytes(meta_.size());
RETURN_IF_ERROR(file_writer_->close());
@@ -143,6 +144,7 @@ Status SpillWriter::_write_internal(const Block& block,
size_t& written_bytes) {
Defer defer {[&]() {
if (status.ok()) {
data_dir_->update_spill_data_usage(buff_size);
+
ExecEnv::GetInstance()->spill_stream_mgr()->update_spill_write_bytes(buff_size);
written_bytes += buff_size;
max_sub_block_size_ = std::max(max_sub_block_size_,
(size_t)buff_size);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]