This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new b1fd701493 [fix](memtracker) Improve memory tracking accuracy for exec nodes (#11947) b1fd701493 is described below commit b1fd701493919e89936662f84f151b3071cc4578 Author: Xinyi Zou <zouxiny...@gmail.com> AuthorDate: Mon Aug 22 08:56:05 2022 +0800 [fix](memtracker) Improve memory tracking accuracy for exec nodes (#11947) --- be/src/exec/blocking_join_node.cpp | 1 + be/src/exec/hash_join_node.cpp | 1 + be/src/exec/olap_scan_node.cpp | 2 ++ be/src/exec/tablet_sink.cpp | 4 +++- be/src/runtime/memory/mem_tracker_limiter.cpp | 8 ++++---- be/src/service/internal_service.cpp | 3 +++ be/src/vec/exec/join/vhash_join_node.cpp | 1 + be/src/vec/exec/vblocking_join_node.cpp | 1 + be/src/vec/exec/volap_scan_node.cpp | 2 ++ be/src/vec/sink/vtablet_sink.cpp | 1 + 10 files changed, 19 insertions(+), 5 deletions(-) diff --git a/be/src/exec/blocking_join_node.cpp b/be/src/exec/blocking_join_node.cpp index 10419a3499..5e5aee3714 100644 --- a/be/src/exec/blocking_join_node.cpp +++ b/be/src/exec/blocking_join_node.cpp @@ -93,6 +93,7 @@ Status BlockingJoinNode::close(RuntimeState* state) { void BlockingJoinNode::build_side_thread(RuntimeState* state, std::promise<Status>* status) { SCOPED_ATTACH_TASK(state); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); status->set_value(construct_build_side(state)); } diff --git a/be/src/exec/hash_join_node.cpp b/be/src/exec/hash_join_node.cpp index cbba3c607c..f5a7fec132 100644 --- a/be/src/exec/hash_join_node.cpp +++ b/be/src/exec/hash_join_node.cpp @@ -179,6 +179,7 @@ Status HashJoinNode::close(RuntimeState* state) { void HashJoinNode::build_side_thread(RuntimeState* state, std::promise<Status>* status) { SCOPED_ATTACH_TASK(state); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); status->set_value(construct_hash_table(state)); } diff --git a/be/src/exec/olap_scan_node.cpp b/be/src/exec/olap_scan_node.cpp index 048a922bae..99caa8afb9 100644 --- a/be/src/exec/olap_scan_node.cpp +++ b/be/src/exec/olap_scan_node.cpp @@ -1480,6 +1480,7 @@ Status OlapScanNode::normalize_bloom_filter_predicate(SlotDescriptor* slot) { void OlapScanNode::transfer_thread(RuntimeState* state) { // scanner open pushdown to scanThread SCOPED_ATTACH_TASK(state); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); Status status = Status::OK(); for (auto scanner : _olap_scanners) { status = Expr::clone_if_not_exists(_conjunct_ctxs, state, scanner->conjunct_ctxs()); @@ -1663,6 +1664,7 @@ void OlapScanNode::transfer_thread(RuntimeState* state) { void OlapScanNode::scanner_thread(OlapScanner* scanner) { SCOPED_ATTACH_TASK(_runtime_state); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); Thread::set_self_name("olap_scanner"); if (UNLIKELY(_transfer_done)) { _scanner_done = true; diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp index cdb503551b..a1213d348d 100644 --- a/be/src/exec/tablet_sink.cpp +++ b/be/src/exec/tablet_sink.cpp @@ -468,8 +468,9 @@ int NodeChannel::try_send_and_fetch_status(RuntimeState* state, } void NodeChannel::try_send_batch(RuntimeState* state) { - SCOPED_ATTACH_TASK(state); SCOPED_ATOMIC_TIMER(&_actual_consume_ns); + SCOPED_ATTACH_TASK(state); + SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get()); AddBatchReq send_batch; { debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan; @@ -1321,6 +1322,7 @@ Status OlapTableSink::_validate_data(RuntimeState* state, RowBatch* batch, Bitma void OlapTableSink::_send_batch_process(RuntimeState* state) { SCOPED_TIMER(_non_blocking_send_timer); SCOPED_ATTACH_TASK(state); + SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); do { int running_channels_num = 0; for (auto index_channel : _channels) { diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp b/be/src/runtime/memory/mem_tracker_limiter.cpp index 50eb011028..84209888ef 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.cpp +++ b/be/src/runtime/memory/mem_tracker_limiter.cpp @@ -226,11 +226,11 @@ std::string MemTrackerLimiter::log_usage(int max_recursive_depth, Status MemTrackerLimiter::mem_limit_exceeded_log(const std::string& msg) { DCHECK(_limit != -1); std::string detail = fmt::format( - "{}, backend={} free memory left={}. If is query, can change the limit by `set " - "exec_mem_limit=xxx`, details mem usage see be.INFO.", + "{}, backend={} memory used={}, free memory left={}. If is query, can change the limit " + "by `set exec_mem_limit=xxx`, details mem usage see be.INFO.", msg, BackendOptions::get_localhost(), - PrettyPrinter::print(ExecEnv::GetInstance()->process_mem_tracker()->spare_capacity(), - TUnit::BYTES)); + PrettyPrinter::print(PerfCounters::get_vm_rss(), TUnit::BYTES), + PrettyPrinter::print(MemInfo::mem_limit() - PerfCounters::get_vm_rss(), TUnit::BYTES)); Status status = Status::MemoryLimitExceeded(detail); // only print the tracker log_usage in be log. diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 02a52ce0e5..87f89f48e0 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -263,6 +263,7 @@ void PInternalServiceImpl::_tablet_writer_add_block(google::protobuf::RpcControl int64_t execution_time_ns = 0; { SCOPED_RAW_TIMER(&execution_time_ns); + SCOPED_SWITCH_BTHREAD_TLS(); auto st = _exec_env->load_channel_mgr()->add_batch(*request, response); if (!st.ok()) { @@ -320,6 +321,8 @@ void PInternalServiceImpl::_tablet_writer_add_batch(google::protobuf::RpcControl int64_t execution_time_ns = 0; { SCOPED_RAW_TIMER(&execution_time_ns); + SCOPED_SWITCH_BTHREAD_TLS(); + // TODO(zxy) delete in 1.2 version brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base); attachment_transfer_request_row_batch<PTabletWriterAddBatchRequest>(request, cntl); diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp index d27c0aea06..228e0020e8 100644 --- a/be/src/vec/exec/join/vhash_join_node.cpp +++ b/be/src/vec/exec/join/vhash_join_node.cpp @@ -1114,6 +1114,7 @@ Status HashJoinNode::open(RuntimeState* state) { void HashJoinNode::_hash_table_build_thread(RuntimeState* state, std::promise<Status>* status) { START_AND_SCOPE_SPAN(state->get_tracer(), span, "HashJoinNode::_hash_table_build_thread"); SCOPED_ATTACH_TASK(state); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); status->set_value(_hash_table_build(state)); } diff --git a/be/src/vec/exec/vblocking_join_node.cpp b/be/src/vec/exec/vblocking_join_node.cpp index f98005ca9c..c6d548de91 100644 --- a/be/src/vec/exec/vblocking_join_node.cpp +++ b/be/src/vec/exec/vblocking_join_node.cpp @@ -75,6 +75,7 @@ Status VBlockingJoinNode::close(RuntimeState* state) { void VBlockingJoinNode::build_side_thread(RuntimeState* state, std::promise<Status>* status) { SCOPED_ATTACH_TASK(state); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); status->set_value(construct_build_side(state)); // Release the thread token as soon as possible (before the main thread joins // on it). This way, if we had a chain of 10 joins using 1 additional thread, diff --git a/be/src/vec/exec/volap_scan_node.cpp b/be/src/vec/exec/volap_scan_node.cpp index ab2f9eda38..5542eefeb0 100644 --- a/be/src/vec/exec/volap_scan_node.cpp +++ b/be/src/vec/exec/volap_scan_node.cpp @@ -274,6 +274,7 @@ void VOlapScanNode::transfer_thread(RuntimeState* state) { // scanner open pushdown to scanThread START_AND_SCOPE_SPAN(state->get_tracer(), span, "VOlapScanNode::transfer_thread"); SCOPED_ATTACH_TASK(state); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); Status status = Status::OK(); if (_vconjunct_ctx_ptr) { @@ -386,6 +387,7 @@ void VOlapScanNode::transfer_thread(RuntimeState* state) { void VOlapScanNode::scanner_thread(VOlapScanner* scanner) { SCOPED_ATTACH_TASK(_runtime_state); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); Thread::set_self_name("volap_scanner"); int64_t wait_time = scanner->update_wait_worker_timer(); // Do not use ScopedTimer. There is no guarantee that, the counter diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp index 7a0faee0d6..3cbc73348c 100644 --- a/be/src/vec/sink/vtablet_sink.cpp +++ b/be/src/vec/sink/vtablet_sink.cpp @@ -235,6 +235,7 @@ int VNodeChannel::try_send_and_fetch_status(RuntimeState* state, void VNodeChannel::try_send_block(RuntimeState* state) { SCOPED_ATTACH_TASK(state); + SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get()); SCOPED_ATOMIC_TIMER(&_actual_consume_ns); AddBlockReq send_block; { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org